gianm commented on code in PR #16039:
URL: https://github.com/apache/druid/pull/16039#discussion_r1530892864


##########
sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/ArrayOverlapOperatorConversion.java:
##########
@@ -127,7 +130,7 @@ public DimFilter toDruidFilter(
         // to create an empty array with no argument, we just return null.
         return null;
       } else if (arrayElements.length == 1) {
-        if (plannerContext.isUseBoundsAndSelectors()) {
+        if (plannerContext.isUseBoundsAndSelectors() || 
(!simpleExtractionExpr.isDirectColumnAccess() && virtualColumnRegistry == 
null)) {

Review Comment:
   when is `virtualColumnRegistry` null?
   
   & if it can be null, then I think CodeQL makes a good point: could it be 
null in the `else` branch as well, where there is no `null` guard?



##########
processing/src/main/java/org/apache/druid/segment/index/semantic/ValueSetIndexes.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.index.semantic;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.segment.column.TypeSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import 
org.apache.druid.segment.index.SimpleImmutableBitmapDelegatingIterableIndex;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public interface ValueSetIndexes
+{
+  double SORTED_MERGE_RATIO_THRESHOLD = 0.12D;
+  int SIZE_WORTH_CHECKING_MIN = 8;
+
+  /**
+   * Get the wrapped {@link ImmutableBitmap} corresponding to the specified 
set of values (if they are contained in the
+   * underlying column). The set must be sorted using the comparator of the 
supplied matchValueType.
+   *
+   * @param sortedValues   values to match, sorted in matchValueType order
+   * @param matchValueType type of the value to match, used to assist 
conversion from the match value type to the column
+   *                       value type
+   * @return               {@link ImmutableBitmap} corresponding to the rows 
which match the values, or null if an index
+   *                       connot be computed for the supplied value type
+   */
+  @Nullable
+  BitmapColumnIndex forSortedValues(@Nonnull List<?> sortedValues, 
TypeSignature<ValueType> matchValueType);
+
+  static <T> BitmapColumnIndex getIndexFromSortedIteratorSortedMerged(

Review Comment:
   Javadoc for this and `getIndexFromSortedIterator`? For some reason, the 
names don't make it obvious to me what they do. (One is regular and one is 
`SortedMerged`?)
   
   Reading the code, it looks like this one does a linear scan through the 
dictionary, and the other one does a series of binary searches. Maybe call them 
`getIndexFromSortedIteratorWithScan` and 
`getIndexFromSortedIteratorWithBinarySearch`.
   
   Oh, now I see why it's called `SortedMerged`. The zipping is kind of like 
the merge step of a merge-sort. Still, I feel the "scan" and "binary search" 
names would make more sense. And Javadoc would definitely help.



##########
processing/src/main/java/org/apache/druid/segment/index/semantic/ValueSetIndexes.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.index.semantic;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.segment.column.TypeSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import 
org.apache.druid.segment.index.SimpleImmutableBitmapDelegatingIterableIndex;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public interface ValueSetIndexes
+{
+  double SORTED_MERGE_RATIO_THRESHOLD = 0.12D;
+  int SIZE_WORTH_CHECKING_MIN = 8;
+
+  /**
+   * Get the wrapped {@link ImmutableBitmap} corresponding to the specified 
set of values (if they are contained in the
+   * underlying column). The set must be sorted using the comparator of the 
supplied matchValueType.
+   *
+   * @param sortedValues   values to match, sorted in matchValueType order
+   * @param matchValueType type of the value to match, used to assist 
conversion from the match value type to the column
+   *                       value type
+   * @return               {@link ImmutableBitmap} corresponding to the rows 
which match the values, or null if an index
+   *                       connot be computed for the supplied value type
+   */
+  @Nullable
+  BitmapColumnIndex forSortedValues(@Nonnull List<?> sortedValues, 
TypeSignature<ValueType> matchValueType);
+
+  static <T> BitmapColumnIndex getIndexFromSortedIteratorSortedMerged(
+      BitmapFactory bitmapFactory,
+      Comparator<T> comparator,
+      Iterable<T> values,
+      Indexed<T> dictionary,
+      Indexed<ImmutableBitmap> bitmaps,
+      Supplier<ImmutableBitmap> unknownsBitmap
+  )
+  {
+    return new BaseValueSetIndexesFromIterable(bitmapFactory, bitmaps, 
unknownsBitmap)
+    {
+      @Override
+      public Iterable<ImmutableBitmap> getBitmapIterable()
+      {
+        return () -> new Iterator<ImmutableBitmap>()
+        {
+          final PeekingIterator<T> valuesIterator = 
Iterators.peekingIterator(values.iterator());
+          final PeekingIterator<T> dictionaryIterator = 
Iterators.peekingIterator(dictionary.iterator());
+          int next = -1;
+          int idx = 0;
+
+          @Override
+          public boolean hasNext()
+          {
+            if (next < 0) {
+              findNext();
+            }
+            return next >= 0;
+          }
+
+          @Override
+          public ImmutableBitmap next()
+          {
+            if (next < 0) {
+              findNext();
+              if (next < 0) {
+                throw new NoSuchElementException();
+              }
+            }
+            final int swap = next;
+            next = -1;
+            return getBitmap(swap);
+          }
+
+          private void findNext()
+          {
+            while (next < 0 && valuesIterator.hasNext() && 
dictionaryIterator.hasNext()) {
+              final T nextValue = valuesIterator.peek();
+              final T nextDictionaryKey = dictionaryIterator.peek();
+              final int comparison = comparator.compare(nextValue, 
nextDictionaryKey);
+              if (comparison == 0) {
+                next = idx;
+                valuesIterator.next();
+                break;
+              } else if (comparison < 0) {
+                valuesIterator.next();
+              } else {
+                dictionaryIterator.next();
+                idx++;
+              }
+            }
+          }
+        };
+      }
+    };
+  }
+
+  static <T> BitmapColumnIndex getIndexFromSortedIterator(
+      BitmapFactory bitmapFactory,
+      Iterable<T> values,
+      Indexed<T> dictionary,
+      Indexed<ImmutableBitmap> bitmaps,
+      Supplier<ImmutableBitmap> getUnknownsIndex
+  )
+  {
+    return new BaseValueSetIndexesFromIterable(bitmapFactory, bitmaps, 
getUnknownsIndex)
+    {
+      @Override
+      public Iterable<ImmutableBitmap> getBitmapIterable()
+      {
+        return () -> new Iterator<ImmutableBitmap>()
+        {
+          final int dictionarySize = dictionary.size();
+          final Iterator<T> iterator = values.iterator();
+          int next = -1;
+
+          @Override
+          public boolean hasNext()
+          {
+            if (next < 0) {
+              findNext();
+            }
+            return next >= 0;
+          }
+
+          @Override
+          public ImmutableBitmap next()
+          {
+            if (next < 0) {
+              findNext();
+              if (next < 0) {
+                throw new NoSuchElementException();
+              }
+            }
+            final int swap = next;
+            next = -1;
+            return getBitmap(swap);
+          }
+
+          private void findNext()
+          {
+            while (next < 0 && iterator.hasNext()) {
+              T nextValue = iterator.next();
+              next = dictionary.indexOf(nextValue);
+
+              if (next == -dictionarySize - 1) {
+                // nextValue is past the end of the dictionary so we can break 
early
+                // Note: we can rely on indexOf returning (-(insertion point) 
- 1), because of the earlier check
+                // for Indexed.isSorted(), which guarantees this behavior
+                break;
+              }
+            }
+          }
+        };
+      }
+    };
+  }
+
+  static <T> BitmapColumnIndex getIndexFromIterator(

Review Comment:
   Javadoc would be useful for this one too.



##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrays;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  @Nullable
+  private final List<?> unsortedValues;
+  private final Supplier<List<?>> lazyMatchValues;
+  @Nullable
+  private final Supplier<SortedSet<ByteBuffer>> lazyMatchValueBytes;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final Supplier<DruidPredicateFactory> predicateFactorySupplier;
+
+  @JsonIgnore
+  private final Supplier<byte[]> cacheKeySupplier;
+
+  /**
+   * Creates a new filter.
+   *
+   * @param column         column to search
+   * @param values         set of values to match. This collection may be 
reused to avoid copying a big collection.
+   *                       Therefore, callers should <b>not</b> modify the 
collection after it is passed to this
+   *                       constructor.
+   * @param matchValueType type of values contained in set
+   * @param filterTuning   optional tuning
+   */
+  @JsonCreator
+  public TypedInFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("values") @Nullable List<?> values,
+      @JsonProperty("sortedValues") @Nullable List<?> sortedValues,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (NullHandling.replaceWithDefault()) {
+      throw InvalidInput.exception(
+          "Invalid IN filter, typed in filter only supports SQL compatible 
null handling mode, set druid.generic.useDefaultValue=false to use this filter"
+      );
+    }
+    this.column = column;
+    if (column == null) {
+      throw InvalidInput.exception("Invalid IN filter, column cannot be null");
+    }
+    this.filterTuning = filterTuning;
+    this.matchValueType = matchValueType;
+    if (matchValueType == null) {
+      throw InvalidInput.exception("Invalid IN filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    // one of sorted or not sorted
+    if (values == null && sortedValues == null) {
+      throw InvalidInput.exception(
+          "Invalid IN filter on column [%s], exactly one of values or 
sortedValues must be non-null",
+          column
+      );
+    }
+    if (sortedValues != null) {
+      this.unsortedValues = null;
+      this.lazyMatchValues = () -> sortedValues;
+    } else {
+      if (checkSorted(values, matchValueType)) {
+        this.unsortedValues = null;
+        this.lazyMatchValues = () -> values;
+      } else {
+        this.unsortedValues = values;
+        this.lazyMatchValues = Suppliers.memoize(() -> 
sortValues(unsortedValues, matchValueType));
+      }
+    }
+    if (matchValueType.is(ValueType.STRING)) {
+      this.lazyMatchValueBytes = Suppliers.memoize(() -> {
+        final SortedSet<ByteBuffer> matchValueBytes = new 
ObjectAVLTreeSet<>(ByteBufferUtils.utf8Comparator());
+        for (Object s : lazyMatchValues.get()) {
+          matchValueBytes.add(StringUtils.toUtf8ByteBuffer(Evals.asString(s)));
+        }
+        return matchValueBytes;
+      });
+    } else {
+      this.lazyMatchValueBytes = null;
+    }
+
+    this.predicateFactorySupplier = Suppliers.memoize(
+        () -> new PredicateFactory(lazyMatchValues.get(), matchValueType)
+    );
+    this.cacheKeySupplier = Suppliers.memoize(this::computeCacheKey);
+  }
+
+  @JsonProperty
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @JsonProperty
+  public List<?> getSortedValues()
+  {
+    return lazyMatchValues.get();
+  }
+
+  @JsonProperty
+  public ColumnType getMatchValueType()
+  {
+    return matchValueType;
+  }
+
+  @Nullable
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @JsonProperty
+  public FilterTuning getFilterTuning()
+  {
+    return filterTuning;
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return cacheKeySupplier.get();
+  }
+
+  @Override
+  public DimFilter optimize(final boolean mayIncludeUnknown)
+  {
+    final List<?> matchValues = lazyMatchValues.get();
+    if (matchValues.isEmpty()) {
+      return FalseDimFilter.instance();
+    } else if (matchValues.size() == 1) {
+      if (matchValues.get(0) == null) {
+        return NullFilter.forColumn(column);
+      }
+      return new EqualityFilter(
+          column,
+          matchValueType,
+          matchValues.iterator().next(),
+          filterTuning
+      );
+    }
+    return this;
+  }
+
+  @Override
+  public Filter toFilter()
+  {
+    return this;
+  }
+
+  @Nullable
+  @Override
+  public RangeSet<String> getDimensionRangeSet(String dimension)
+  {
+    if (!Objects.equals(getColumn(), dimension)) {
+      return null;
+    }
+    RangeSet<String> retSet = TreeRangeSet.create();
+    for (Object value : lazyMatchValues.get()) {
+      String valueEquivalent = 
NullHandling.nullToEmptyIfNeeded(Evals.asString(value));
+      if (valueEquivalent == null) {
+        // Case when SQL compatible null handling is enabled
+        // Range.singleton(null) is invalid, so use the fact that
+        // only null values are less than empty string.
+        retSet.add(Range.lessThan(""));
+      } else {
+        retSet.add(Range.singleton(valueEquivalent));
+      }
+    }
+    return retSet;
+  }
+
+  @Override
+  public Set<String> getRequiredColumns()
+  {
+    return ImmutableSet.of(column);
+  }
+
+  @Override
+  @Nullable
+  public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
+  {
+    if (!Filters.checkFilterTuningUseIndex(column, selector, filterTuning)) {
+      return null;
+    }
+    final ColumnIndexSupplier indexSupplier = 
selector.getIndexSupplier(column);
+
+    if (indexSupplier == null) {
+      // column doesn't exist, match against null
+      DruidPredicateMatch match = 
predicateFactorySupplier.get().makeStringPredicate().apply(null);
+      return Filters.makeMissingColumnNullIndex(match, selector);
+    }
+
+    if (lazyMatchValueBytes != null) {
+      final Utf8ValueSetIndexes utf8ValueSetIndexes = 
indexSupplier.as(Utf8ValueSetIndexes.class);
+      if (utf8ValueSetIndexes != null) {
+        return 
utf8ValueSetIndexes.forSortedValuesUtf8(lazyMatchValueBytes.get());
+      }
+    }
+
+    final ValueSetIndexes valueSetIndexes = 
indexSupplier.as(ValueSetIndexes.class);
+    if (valueSetIndexes != null) {
+      return valueSetIndexes.forSortedValues(lazyMatchValues.get(), 
matchValueType);
+    }
+
+    return Filters.makePredicateIndex(
+        column,
+        selector,
+        predicateFactorySupplier.get()
+    );
+  }
+
+  @Override
+  public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
+  {
+    return Filters.makeValueMatcher(factory, column, 
predicateFactorySupplier.get());
+  }
+
+  @Override
+  public VectorValueMatcher makeVectorMatcher(final 
VectorColumnSelectorFactory factory)
+  {
+    return ColumnProcessors.makeVectorProcessor(
+        column,
+        VectorValueMatcherColumnProcessorFactory.instance(),
+        factory
+    ).makeMatcher(predicateFactorySupplier.get());
+  }
+
+  @Override
+  public boolean canVectorizeMatcher(ColumnInspector inspector)
+  {
+    return true;
+  }
+
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(column);
+    if (rewriteDimensionTo == null) {
+      throw new IAE("Received a non-applicable rewrite: %s, filter's 
dimension: %s", columnRewrites, column);
+    }
+
+    if (rewriteDimensionTo.equals(column)) {
+      return this;
+    } else {
+      return new TypedInFilter(
+          rewriteDimensionTo,
+          matchValueType,
+          null,
+          lazyMatchValues.get(),
+          filterTuning
+      );
+    }
+  }
+
+  @Override
+  public String toString()
+  {
+    final DimFilter.DimFilterToStringBuilder builder = new 
DimFilter.DimFilterToStringBuilder();
+    return builder.appendDimension(column, null)
+                  .append(" IN (")
+                  .append(Joiner.on(", 
").join(Iterables.transform(lazyMatchValues.get(), String::valueOf)))
+                  .append(")")
+                  .append(" (" + matchValueType + ")")
+                  .appendFilterTuning(filterTuning)
+                  .build();
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TypedInFilter that = (TypedInFilter) o;
+    return column.equals(that.column) &&
+           Objects.equals(matchValueType, that.matchValueType) &&
+           compareValues(lazyMatchValues.get(), that.lazyMatchValues.get(), 
matchValueType) &&
+           Objects.equals(filterTuning, that.filterTuning);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(lazyMatchValues.get(), column, matchValueType, 
filterTuning);
+  }
+
+  private byte[] computeCacheKey()
+  {
+    // Hash all values, in sorted order, as their length followed by their 
content.
+    final Hasher hasher = Hashing.sha256().newHasher();
+    for (Object v : lazyMatchValues.get()) {
+      if (v == null) {
+        // Encode null as length -1, no content.
+        hasher.putInt(-1);
+      } else {
+        final String s = Evals.asString(v);
+        hasher.putInt(s.length());
+        hasher.putString(s, StandardCharsets.UTF_8);
+      }
+    }
+
+    return new CacheKeyBuilder(DimFilterUtils.NEW_IN_CACHE_ID)
+        .appendString(column)
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendString(matchValueType.asTypeString())
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendByteArray(hasher.hash().asBytes())
+        .build();
+  }
+
+  private static boolean checkSorted(List<?> unsortedValues, ColumnType 
matchValueType)
+  {
+    final Comparator<Object> comparator = matchValueType.getNullableStrategy();
+    Object prev = null;
+    boolean needsCoerceCheck = true;
+    for (Object o : unsortedValues) {
+      if (needsCoerceCheck && o != null) {
+        Object coerced = coerceValue(o, matchValueType);
+        //noinspection ObjectEquality
+        if (coerced != o) {
+          return false;
+        }
+        needsCoerceCheck = false;
+      }
+      if (prev != null && comparator.compare(prev, o) >= 0) {
+        return false;
+      }
+      prev = o;
+    }
+    return true;
+  }
+
+  @Nullable
+  private static Object coerceValue(@Nullable Object o, ColumnType 
matchValueType)
+  {
+    if (o == null) {
+      return o;
+    }
+    switch (matchValueType.getType()) {
+      case STRING:
+        return DimensionHandlerUtils.convertObjectToString(o);
+      case LONG:
+        return DimensionHandlerUtils.convertObjectToLong(o);
+      case FLOAT:
+        return DimensionHandlerUtils.convertObjectToFloat(o);
+      case DOUBLE:
+        return DimensionHandlerUtils.convertObjectToDouble(o);
+      default:
+        throw InvalidInput.exception("Unsupported matchValueType[%s]", 
matchValueType);
+    }
+  }
+
+  private static List<?> sortValues(List<?> unsortedValues, ColumnType 
matchValueType)
+  {
+    final Object[] array = unsortedValues.toArray(new Object[0]);
+    // check if values need coerced
+    for (int i = 0; i < array.length; i++) {
+      Object coerced = coerceValue(array[i], matchValueType);
+      //noinspection ObjectEquality
+      if (coerced != null && array[i] == coerced) {
+        // assume list is all same type objects...

Review Comment:
   Is this going to cause issues when reading JSON that has `values` set rather 
than `sortedValues`? (Where they might not all be the same type)
   
   Also, do we care? I suppose if we aren't documenting this, then there's no 
way we'd get JSON with `values` in it. The serializer always writes 
`sortedValues`. You know, though, if this is the thinking, then we should 
really also split the constructor into two: a `@JsonCreator` that takes 
`sortedValues` only (not `values`) and a non-Jackson-enabled constructor that 
accepts possibly-unsorted values. No sense in supporting `values` in the JSON 
if we aren't going to use it.



##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrays;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  @Nullable
+  private final List<?> unsortedValues;
+  private final Supplier<List<?>> lazyMatchValues;
+  @Nullable
+  private final Supplier<SortedSet<ByteBuffer>> lazyMatchValueBytes;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final Supplier<DruidPredicateFactory> predicateFactorySupplier;
+
+  @JsonIgnore
+  private final Supplier<byte[]> cacheKeySupplier;
+
+  /**
+   * Creates a new filter.
+   *
+   * @param column         column to search
+   * @param values         set of values to match. This collection may be 
reused to avoid copying a big collection.
+   *                       Therefore, callers should <b>not</b> modify the 
collection after it is passed to this
+   *                       constructor.
+   * @param matchValueType type of values contained in set
+   * @param filterTuning   optional tuning
+   */
+  @JsonCreator
+  public TypedInFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("values") @Nullable List<?> values,
+      @JsonProperty("sortedValues") @Nullable List<?> sortedValues,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (NullHandling.replaceWithDefault()) {
+      throw InvalidInput.exception(
+          "Invalid IN filter, typed in filter only supports SQL compatible 
null handling mode, set druid.generic.useDefaultValue=false to use this filter"
+      );
+    }
+    this.column = column;
+    if (column == null) {
+      throw InvalidInput.exception("Invalid IN filter, column cannot be null");
+    }
+    this.filterTuning = filterTuning;
+    this.matchValueType = matchValueType;
+    if (matchValueType == null) {
+      throw InvalidInput.exception("Invalid IN filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    // one of sorted or not sorted
+    if (values == null && sortedValues == null) {
+      throw InvalidInput.exception(
+          "Invalid IN filter on column [%s], exactly one of values or 
sortedValues must be non-null",
+          column
+      );
+    }
+    if (sortedValues != null) {
+      this.unsortedValues = null;
+      this.lazyMatchValues = () -> sortedValues;
+    } else {
+      if (checkSorted(values, matchValueType)) {
+        this.unsortedValues = null;
+        this.lazyMatchValues = () -> values;
+      } else {
+        this.unsortedValues = values;
+        this.lazyMatchValues = Suppliers.memoize(() -> 
sortValues(unsortedValues, matchValueType));
+      }
+    }
+    if (matchValueType.is(ValueType.STRING)) {
+      this.lazyMatchValueBytes = Suppliers.memoize(() -> {
+        final SortedSet<ByteBuffer> matchValueBytes = new 
ObjectAVLTreeSet<>(ByteBufferUtils.utf8Comparator());

Review Comment:
   Couldn't this be a `List` rather than a tree set? The values are already in 
the right order thanks to `lazyMatchValues.get()`.



##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrays;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  @Nullable
+  private final List<?> unsortedValues;
+  private final Supplier<List<?>> lazyMatchValues;
+  @Nullable
+  private final Supplier<SortedSet<ByteBuffer>> lazyMatchValueBytes;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final Supplier<DruidPredicateFactory> predicateFactorySupplier;
+
+  @JsonIgnore
+  private final Supplier<byte[]> cacheKeySupplier;
+
+  /**
+   * Creates a new filter.
+   *
+   * @param column         column to search
+   * @param values         set of values to match. This collection may be 
reused to avoid copying a big collection.
+   *                       Therefore, callers should <b>not</b> modify the 
collection after it is passed to this
+   *                       constructor.
+   * @param matchValueType type of values contained in set
+   * @param filterTuning   optional tuning
+   */
+  @JsonCreator
+  public TypedInFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("values") @Nullable List<?> values,
+      @JsonProperty("sortedValues") @Nullable List<?> sortedValues,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (NullHandling.replaceWithDefault()) {
+      throw InvalidInput.exception(
+          "Invalid IN filter, typed in filter only supports SQL compatible 
null handling mode, set druid.generic.useDefaultValue=false to use this filter"
+      );
+    }
+    this.column = column;
+    if (column == null) {
+      throw InvalidInput.exception("Invalid IN filter, column cannot be null");
+    }
+    this.filterTuning = filterTuning;
+    this.matchValueType = matchValueType;
+    if (matchValueType == null) {
+      throw InvalidInput.exception("Invalid IN filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    // one of sorted or not sorted
+    if (values == null && sortedValues == null) {
+      throw InvalidInput.exception(
+          "Invalid IN filter on column [%s], exactly one of values or 
sortedValues must be non-null",
+          column
+      );
+    }
+    if (sortedValues != null) {
+      this.unsortedValues = null;
+      this.lazyMatchValues = () -> sortedValues;
+    } else {
+      if (checkSorted(values, matchValueType)) {

Review Comment:
   Do we need to check for duplicates too, or are duplicates ok in 
`lazyMatchValues`? Similar question for `sortValues`: does it need to dedupe or 
is just sorting ok?



##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrays;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  @Nullable
+  private final List<?> unsortedValues;

Review Comment:
   Some Javadoc about this and the others (`lazyMatchValues`, 
`lazyMatchValueBytes`) would be helpful, explaining the invariants in play. 
Reading the rest of the code, I think it's something like: 
   
   - `lazyMatchValues` is always set and always returns a set sorted by 
`matchValueType`
   - `unsortedValues` is set if we know that the values backing 
`lazyMatchValues` haven't been sorted yet
   - regardless of the above, when we serialize via JSON, the values are always 
sorted and always set under `sortedValues`. this is then trusted at 
deserialization time.
   
   What about duplicates, though? Might `unsortedValues` or `lazyMatchValues` 
contain duplicates?



##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrays;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  @Nullable
+  private final List<?> unsortedValues;
+  private final Supplier<List<?>> lazyMatchValues;
+  @Nullable
+  private final Supplier<SortedSet<ByteBuffer>> lazyMatchValueBytes;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final Supplier<DruidPredicateFactory> predicateFactorySupplier;
+
+  @JsonIgnore
+  private final Supplier<byte[]> cacheKeySupplier;
+
+  /**
+   * Creates a new filter.
+   *
+   * @param column         column to search
+   * @param values         set of values to match. This collection may be 
reused to avoid copying a big collection.
+   *                       Therefore, callers should <b>not</b> modify the 
collection after it is passed to this
+   *                       constructor.
+   * @param matchValueType type of values contained in set
+   * @param filterTuning   optional tuning
+   */
+  @JsonCreator
+  public TypedInFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("values") @Nullable List<?> values,
+      @JsonProperty("sortedValues") @Nullable List<?> sortedValues,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (NullHandling.replaceWithDefault()) {
+      throw InvalidInput.exception(
+          "Invalid IN filter, typed in filter only supports SQL compatible 
null handling mode, set druid.generic.useDefaultValue=false to use this filter"
+      );
+    }
+    this.column = column;
+    if (column == null) {
+      throw InvalidInput.exception("Invalid IN filter, column cannot be null");
+    }
+    this.filterTuning = filterTuning;
+    this.matchValueType = matchValueType;
+    if (matchValueType == null) {
+      throw InvalidInput.exception("Invalid IN filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    // one of sorted or not sorted
+    if (values == null && sortedValues == null) {
+      throw InvalidInput.exception(
+          "Invalid IN filter on column [%s], exactly one of values or 
sortedValues must be non-null",
+          column
+      );
+    }
+    if (sortedValues != null) {
+      this.unsortedValues = null;
+      this.lazyMatchValues = () -> sortedValues;
+    } else {
+      if (checkSorted(values, matchValueType)) {
+        this.unsortedValues = null;
+        this.lazyMatchValues = () -> values;
+      } else {
+        this.unsortedValues = values;
+        this.lazyMatchValues = Suppliers.memoize(() -> 
sortValues(unsortedValues, matchValueType));
+      }
+    }
+    if (matchValueType.is(ValueType.STRING)) {
+      this.lazyMatchValueBytes = Suppliers.memoize(() -> {

Review Comment:
   If `lazyMatchValues` came in through `sortedValues` (indicating we're 
running on a data server or running with known-sorted data) then I think this 
should be eager too.
   
   Reason for that is if it's _not_ eager, probably it will get computed in 
some processing thread while processing some segment, and then all other 
processing threads for the same query will block on it. (They'll likely all be 
trying to compute this at the same time.)
   
   Probably still want it to be lazy if `checkSorted` returns `true` but 
`sortedValues` was not set. That's likely to happen during SQL planning, and 
there's no need to do utf8 encoding at that time.



##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrays;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter

Review Comment:
   Some Javadoc for this and `InDimFilter` that point to each other would be 
good. People new to this area of the code base will need to be informed that 
both exist.



##########
processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java:
##########
@@ -258,6 +265,70 @@ public <T> T computeBitmapResult(BitmapResultFactory<T> 
bitmapResultFactory, boo
     }
   }
 
+  private final class DoubleValueSetIndexes implements ValueSetIndexes
+  {
+    final FixedIndexed<Double> dictionary = doubleDictionarySupplier.get();
+
+    @Nullable
+    @Override
+    public BitmapColumnIndex forSortedValues(@Nonnull List<?> sortedValues, 
TypeSignature<ValueType> matchValueType)
+    {
+      final boolean matchNull = sortedValues.get(0) == null;
+      final Supplier<ImmutableBitmap> unknownsIndex = () -> {
+        if (!matchNull && dictionary.get(0) == null) {
+          return valueIndexes.get(0);
+        }
+        return null;
+      };
+
+      // values are doubles and ordered in double order
+      if (matchValueType.is(ValueType.DOUBLE)) {

Review Comment:
   Is it possible to minimize the copied code between here and the long 
version, such as by using a shared helper somewhere?



##########
processing/src/test/java/org/apache/druid/segment/filter/TypedInFilterTests.java:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.filter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.RangeSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.NotDimFilter;
+import org.apache.druid.query.filter.TypedInFilter;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@RunWith(Enclosed.class)
+public class TypedInFilterTests

Review Comment:
   Is there a way to write a test class that subclasses `InFilterTest` so it 
automatically gets any new tests we add there? Maybe make `inFilter` a 
`protected abstract` method rather than a `static`. The copied tests here make 
it tough to properly maintain the test suite.



##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrays;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  @Nullable
+  private final List<?> unsortedValues;
+  private final Supplier<List<?>> lazyMatchValues;
+  @Nullable
+  private final Supplier<SortedSet<ByteBuffer>> lazyMatchValueBytes;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final Supplier<DruidPredicateFactory> predicateFactorySupplier;
+
+  @JsonIgnore
+  private final Supplier<byte[]> cacheKeySupplier;
+
+  /**
+   * Creates a new filter.
+   *
+   * @param column         column to search
+   * @param values         set of values to match. This collection may be 
reused to avoid copying a big collection.
+   *                       Therefore, callers should <b>not</b> modify the 
collection after it is passed to this
+   *                       constructor.
+   * @param matchValueType type of values contained in set
+   * @param filterTuning   optional tuning
+   */
+  @JsonCreator
+  public TypedInFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("values") @Nullable List<?> values,
+      @JsonProperty("sortedValues") @Nullable List<?> sortedValues,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (NullHandling.replaceWithDefault()) {
+      throw InvalidInput.exception(

Review Comment:
   Better to do this check during `toFilter()` or one of the runtime methods 
instead of at dimFilter construction time, since doing it at the `@JsonCreator` 
path means that this filter can't be deserialized if the flag changes value. 
That might cause issues since filters end up in task specs (`transformSpec`) 
etc.



##########
processing/src/main/java/org/apache/druid/query/filter/TypedInFilter.java:
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrays;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueSetIndexes;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+public class TypedInFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  @Nullable
+  private final List<?> unsortedValues;
+  private final Supplier<List<?>> lazyMatchValues;
+  @Nullable
+  private final Supplier<SortedSet<ByteBuffer>> lazyMatchValueBytes;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final Supplier<DruidPredicateFactory> predicateFactorySupplier;
+
+  @JsonIgnore
+  private final Supplier<byte[]> cacheKeySupplier;
+
+  /**
+   * Creates a new filter.
+   *
+   * @param column         column to search
+   * @param values         set of values to match. This collection may be 
reused to avoid copying a big collection.
+   *                       Therefore, callers should <b>not</b> modify the 
collection after it is passed to this
+   *                       constructor.
+   * @param matchValueType type of values contained in set
+   * @param filterTuning   optional tuning
+   */
+  @JsonCreator
+  public TypedInFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("values") @Nullable List<?> values,
+      @JsonProperty("sortedValues") @Nullable List<?> sortedValues,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (NullHandling.replaceWithDefault()) {
+      throw InvalidInput.exception(
+          "Invalid IN filter, typed in filter only supports SQL compatible 
null handling mode, set druid.generic.useDefaultValue=false to use this filter"
+      );
+    }
+    this.column = column;
+    if (column == null) {
+      throw InvalidInput.exception("Invalid IN filter, column cannot be null");
+    }
+    this.filterTuning = filterTuning;
+    this.matchValueType = matchValueType;
+    if (matchValueType == null) {
+      throw InvalidInput.exception("Invalid IN filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    // one of sorted or not sorted
+    if (values == null && sortedValues == null) {
+      throw InvalidInput.exception(
+          "Invalid IN filter on column [%s], exactly one of values or 
sortedValues must be non-null",
+          column
+      );
+    }
+    if (sortedValues != null) {
+      this.unsortedValues = null;
+      this.lazyMatchValues = () -> sortedValues;
+    } else {
+      if (checkSorted(values, matchValueType)) {
+        this.unsortedValues = null;
+        this.lazyMatchValues = () -> values;
+      } else {
+        this.unsortedValues = values;
+        this.lazyMatchValues = Suppliers.memoize(() -> 
sortValues(unsortedValues, matchValueType));
+      }
+    }
+    if (matchValueType.is(ValueType.STRING)) {
+      this.lazyMatchValueBytes = Suppliers.memoize(() -> {
+        final SortedSet<ByteBuffer> matchValueBytes = new 
ObjectAVLTreeSet<>(ByteBufferUtils.utf8Comparator());
+        for (Object s : lazyMatchValues.get()) {
+          matchValueBytes.add(StringUtils.toUtf8ByteBuffer(Evals.asString(s)));
+        }
+        return matchValueBytes;
+      });
+    } else {
+      this.lazyMatchValueBytes = null;
+    }
+
+    this.predicateFactorySupplier = Suppliers.memoize(
+        () -> new PredicateFactory(lazyMatchValues.get(), matchValueType)
+    );
+    this.cacheKeySupplier = Suppliers.memoize(this::computeCacheKey);
+  }
+
+  @JsonProperty
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @JsonProperty
+  public List<?> getSortedValues()
+  {
+    return lazyMatchValues.get();
+  }
+
+  @JsonProperty
+  public ColumnType getMatchValueType()
+  {
+    return matchValueType;
+  }
+
+  @Nullable
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @JsonProperty
+  public FilterTuning getFilterTuning()
+  {
+    return filterTuning;
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return cacheKeySupplier.get();
+  }
+
+  @Override
+  public DimFilter optimize(final boolean mayIncludeUnknown)
+  {
+    final List<?> matchValues = lazyMatchValues.get();
+    if (matchValues.isEmpty()) {
+      return FalseDimFilter.instance();
+    } else if (matchValues.size() == 1) {
+      if (matchValues.get(0) == null) {
+        return NullFilter.forColumn(column);
+      }
+      return new EqualityFilter(
+          column,
+          matchValueType,
+          matchValues.iterator().next(),
+          filterTuning
+      );
+    }
+    return this;
+  }
+
+  @Override
+  public Filter toFilter()
+  {
+    return this;
+  }
+
+  @Nullable
+  @Override
+  public RangeSet<String> getDimensionRangeSet(String dimension)
+  {
+    if (!Objects.equals(getColumn(), dimension)) {
+      return null;
+    }
+    RangeSet<String> retSet = TreeRangeSet.create();
+    for (Object value : lazyMatchValues.get()) {
+      String valueEquivalent = 
NullHandling.nullToEmptyIfNeeded(Evals.asString(value));
+      if (valueEquivalent == null) {
+        // Case when SQL compatible null handling is enabled
+        // Range.singleton(null) is invalid, so use the fact that
+        // only null values are less than empty string.
+        retSet.add(Range.lessThan(""));
+      } else {
+        retSet.add(Range.singleton(valueEquivalent));
+      }
+    }
+    return retSet;
+  }
+
+  @Override
+  public Set<String> getRequiredColumns()
+  {
+    return ImmutableSet.of(column);
+  }
+
+  @Override
+  @Nullable
+  public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
+  {
+    if (!Filters.checkFilterTuningUseIndex(column, selector, filterTuning)) {
+      return null;
+    }
+    final ColumnIndexSupplier indexSupplier = 
selector.getIndexSupplier(column);
+
+    if (indexSupplier == null) {
+      // column doesn't exist, match against null
+      DruidPredicateMatch match = 
predicateFactorySupplier.get().makeStringPredicate().apply(null);
+      return Filters.makeMissingColumnNullIndex(match, selector);
+    }
+
+    if (lazyMatchValueBytes != null) {
+      final Utf8ValueSetIndexes utf8ValueSetIndexes = 
indexSupplier.as(Utf8ValueSetIndexes.class);
+      if (utf8ValueSetIndexes != null) {
+        return 
utf8ValueSetIndexes.forSortedValuesUtf8(lazyMatchValueBytes.get());
+      }
+    }
+
+    final ValueSetIndexes valueSetIndexes = 
indexSupplier.as(ValueSetIndexes.class);
+    if (valueSetIndexes != null) {
+      return valueSetIndexes.forSortedValues(lazyMatchValues.get(), 
matchValueType);
+    }
+
+    return Filters.makePredicateIndex(
+        column,
+        selector,
+        predicateFactorySupplier.get()
+    );
+  }
+
+  @Override
+  public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
+  {
+    return Filters.makeValueMatcher(factory, column, 
predicateFactorySupplier.get());
+  }
+
+  @Override
+  public VectorValueMatcher makeVectorMatcher(final 
VectorColumnSelectorFactory factory)
+  {
+    return ColumnProcessors.makeVectorProcessor(
+        column,
+        VectorValueMatcherColumnProcessorFactory.instance(),
+        factory
+    ).makeMatcher(predicateFactorySupplier.get());
+  }
+
+  @Override
+  public boolean canVectorizeMatcher(ColumnInspector inspector)
+  {
+    return true;
+  }
+
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(column);
+    if (rewriteDimensionTo == null) {
+      throw new IAE("Received a non-applicable rewrite: %s, filter's 
dimension: %s", columnRewrites, column);
+    }
+
+    if (rewriteDimensionTo.equals(column)) {
+      return this;
+    } else {
+      return new TypedInFilter(
+          rewriteDimensionTo,
+          matchValueType,
+          null,
+          lazyMatchValues.get(),
+          filterTuning
+      );
+    }
+  }
+
+  @Override
+  public String toString()
+  {
+    final DimFilter.DimFilterToStringBuilder builder = new 
DimFilter.DimFilterToStringBuilder();
+    return builder.appendDimension(column, null)
+                  .append(" IN (")
+                  .append(Joiner.on(", 
").join(Iterables.transform(lazyMatchValues.get(), String::valueOf)))
+                  .append(")")
+                  .append(" (" + matchValueType + ")")
+                  .appendFilterTuning(filterTuning)
+                  .build();
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TypedInFilter that = (TypedInFilter) o;
+    return column.equals(that.column) &&
+           Objects.equals(matchValueType, that.matchValueType) &&
+           compareValues(lazyMatchValues.get(), that.lazyMatchValues.get(), 
matchValueType) &&
+           Objects.equals(filterTuning, that.filterTuning);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(lazyMatchValues.get(), column, matchValueType, 
filterTuning);
+  }
+
+  private byte[] computeCacheKey()
+  {
+    // Hash all values, in sorted order, as their length followed by their 
content.
+    final Hasher hasher = Hashing.sha256().newHasher();
+    for (Object v : lazyMatchValues.get()) {
+      if (v == null) {
+        // Encode null as length -1, no content.
+        hasher.putInt(-1);
+      } else {
+        final String s = Evals.asString(v);
+        hasher.putInt(s.length());
+        hasher.putString(s, StandardCharsets.UTF_8);
+      }
+    }
+
+    return new CacheKeyBuilder(DimFilterUtils.NEW_IN_CACHE_ID)
+        .appendString(column)
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendString(matchValueType.asTypeString())
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendByteArray(hasher.hash().asBytes())
+        .build();
+  }
+
+  private static boolean checkSorted(List<?> unsortedValues, ColumnType 
matchValueType)
+  {
+    final Comparator<Object> comparator = matchValueType.getNullableStrategy();
+    Object prev = null;
+    boolean needsCoerceCheck = true;
+    for (Object o : unsortedValues) {
+      if (needsCoerceCheck && o != null) {
+        Object coerced = coerceValue(o, matchValueType);
+        //noinspection ObjectEquality
+        if (coerced != o) {
+          return false;
+        }
+        needsCoerceCheck = false;
+      }
+      if (prev != null && comparator.compare(prev, o) >= 0) {
+        return false;
+      }
+      prev = o;
+    }
+    return true;
+  }
+
+  @Nullable
+  private static Object coerceValue(@Nullable Object o, ColumnType 
matchValueType)
+  {
+    if (o == null) {
+      return o;
+    }
+    switch (matchValueType.getType()) {
+      case STRING:
+        return DimensionHandlerUtils.convertObjectToString(o);
+      case LONG:
+        return DimensionHandlerUtils.convertObjectToLong(o);
+      case FLOAT:
+        return DimensionHandlerUtils.convertObjectToFloat(o);
+      case DOUBLE:
+        return DimensionHandlerUtils.convertObjectToDouble(o);
+      default:
+        throw InvalidInput.exception("Unsupported matchValueType[%s]", 
matchValueType);
+    }
+  }
+
+  private static List<?> sortValues(List<?> unsortedValues, ColumnType 
matchValueType)
+  {
+    final Object[] array = unsortedValues.toArray(new Object[0]);
+    // check if values need coerced
+    for (int i = 0; i < array.length; i++) {
+      Object coerced = coerceValue(array[i], matchValueType);
+      //noinspection ObjectEquality
+      if (coerced != null && array[i] == coerced) {
+        // assume list is all same type objects...
+        break;
+      }
+      array[i] = coerced;
+    }
+    ObjectArrays.quickSort(array, matchValueType.getNullableStrategy());
+    return Arrays.asList(array);
+  }
+
+  /**
+   * Since jackson might translate longs into ints and such, we use the type 
comparator to check lists
+   * for {@link #equals(Object)} instead of directly using {@link 
Objects#equals(Object, Object)}
+   */
+  private static boolean compareValues(List<?> o1, List<?> o2, ColumnType 
matchValueType)
+  {
+    final NullableTypeStrategy comparator = 
matchValueType.getNullableStrategy();
+    //noinspection ObjectEquality
+    if (o1 == o2) {
+      return true;
+    }
+    if (o1 == null) {
+      return false;
+    }
+    if (o2 == null) {
+      return false;
+    }
+    final int iter = Math.min(o1.size(), o2.size());
+    for (int i = 0; i < iter; i++) {
+      final int cmp = comparator.compare(o1.get(i), o2.get(i));
+      if (cmp == 0) {
+        continue;
+      }
+      return false;
+    }
+    return o1.size() == o2.size();
+  }
+
+  private static DruidObjectPredicate<String> createStringPredicate(
+      final List sortedValues,

Review Comment:
   I usually prefer `List<?>` if it works ok, because it at least keeps the 
unknownness to the single type param rather than encouraging the dropping of 
all type params.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to