imply-cheddar commented on code in PR #14542:
URL: https://github.com/apache/druid/pull/14542#discussion_r1260579257


##########
extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java:
##########
@@ -444,7 +443,11 @@ public void testEmptyTimeseriesResults()
             Druids.newTimeseriesQueryBuilder()
                   .dataSource(CalciteTests.DATASOURCE1)
                   .intervals(new 
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
-                  .filters(bound("dim2", "0", "0", false, false, null, 
StringComparators.NUMERIC))
+                  .filters(
+                      NullHandling.replaceWithDefault()
+                      ? numericSelector("dim2", "0", null)
+                      : equality("dim2", 0L, ColumnType.LONG)
+                  )

Review Comment:
   Guessing you wrote this before you abstracted the methods?  I.e. can't you 
just use `equality()` given that it's doing the check for you already?



##########
extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java:
##########
@@ -852,7 +855,11 @@ public void testEmptyTimeseriesResults()
         ImmutableList.of(Druids.newTimeseriesQueryBuilder()
                                .dataSource(CalciteTests.DATASOURCE1)
                                
.intervals(querySegmentSpec(Filtration.eternity()))
-                               .filters(bound("dim2", "0", "0", false, false, 
null, StringComparators.NUMERIC))
+                               .filters(
+                                   NullHandling.replaceWithDefault()
+                                   ? numericSelector("dim2", "0", null)
+                                   : equality("dim2", 0L, ColumnType.LONG)
+                               )

Review Comment:
   I had this comment above and I think the difference might be the 
`numericSelector`?  If so, couldn't the `equality` look at the type parameter 
to figure out if it should create a "normal" selector or a numeric one?



##########
processing/src/main/java/org/apache/druid/math/expr/ExprEval.java:
##########
@@ -142,6 +142,26 @@ public static void serialize(ByteBuffer buffer, int 
position, ExpressionType typ
     }
   }
 
+  public static byte[] toBytes(ExpressionType expressionType, 
NullableTypeStrategy<Object> strategy, Object o)
+  {
+    // convert the array to byte[] form so that we take a hash of the whole 
array

Review Comment:
   "so that we take a hash of the whole array" seems strangely specific for a 
public static method on `ExprEval`



##########
processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.TypeSignature;
+import org.apache.druid.segment.column.TypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.DimensionPredicateFilter;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.filter.PredicateValueMatcherFactory;
+import org.apache.druid.segment.filter.ValueMatchers;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.StringValueSetIndex;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class EqualityFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  private final Object matchValue;
+  @Nullable
+  private final ExtractionFn extractionFn;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final DruidPredicateFactory predicateFactory;
+
+  @JsonCreator
+  public EqualityFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("matchValue") Object matchValue,
+      @JsonProperty("extractionFn") @Nullable ExtractionFn extractionFn,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (column == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter, column cannot be 
null");
+    }
+    this.column = column;
+    if (matchValueType == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    this.matchValueType = matchValueType;
+    if (matchValue == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter on column [%s], 
matchValue cannot be null", column);
+    }
+    this.matchValue = matchValue;
+    this.extractionFn = extractionFn;
+    this.filterTuning = filterTuning;
+    this.predicateFactory = new EqualityPredicateFactory(matchValue, 
matchValueType);
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    final TypeStrategy<Object> typeStrategy = matchValueType.getStrategy();
+    final int size = typeStrategy.estimateSizeBytes(matchValue);
+    final ByteBuffer valueBuffer = ByteBuffer.allocate(size);
+    typeStrategy.write(valueBuffer, matchValue, size);
+    return new CacheKeyBuilder(DimFilterUtils.EQUALS_CACHE_ID)
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendString(column)
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendString(matchValueType.asTypeString())
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendByteArray(valueBuffer.array())
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendByteArray(extractionFn == null ? new byte[0] : 
extractionFn.getCacheKey())
+        .build();
+  }
+
+  @Override
+  public DimFilter optimize()
+  {
+    return this;
+  }
+
+  @Override
+  public Filter toFilter()
+  {
+    if (extractionFn == null) {
+      return this;
+    } else {
+      return new DimensionPredicateFilter(column, predicateFactory, 
extractionFn, filterTuning);
+    }
+  }
+
+  @JsonProperty
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @JsonProperty
+  public ColumnType getMatchValueType()
+  {
+    return matchValueType;
+  }
+
+  @JsonProperty
+  public Object getMatchValue()
+  {
+    return matchValue;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public ExtractionFn getExtractionFn()
+  {
+    return extractionFn;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public FilterTuning getFilterTuning()
+  {
+    return filterTuning;
+  }
+
+  @Override
+  public String toString()
+  {
+    DimFilter.DimFilterToStringBuilder bob = new 
DimFilter.DimFilterToStringBuilder().appendDimension(
+                                                                               
          column,
+                                                                               
          extractionFn
+                                                                               
      )
+                                                                               
      .append(" = ")
+                                                                               
      .append(matchValue);
+
+    if (!ColumnType.STRING.equals(matchValueType)) {
+      bob.append(" (" + matchValueType.asTypeString() + ")");
+    }
+    return bob.appendFilterTuning(filterTuning).build();
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    EqualityFilter that = (EqualityFilter) o;
+    if (!column.equals(that.column)) {
+      return false;
+    }
+    if (!Objects.equals(matchValueType, that.matchValueType)) {
+      return false;
+    }
+    if (!Objects.equals(extractionFn, that.extractionFn)) {
+      return false;
+    }
+    if (!Objects.equals(filterTuning, that.filterTuning)) {
+      return false;
+    }
+    if (matchValueType.isArray()) {
+      // just use predicate to see if the values are the same
+      final ExprEval<?> thatValue = ExprEval.ofType(
+          ExpressionType.fromColumnType(that.matchValueType),
+          that.matchValue
+      );
+      final Predicate<Object[]> arrayPredicate = 
predicateFactory.makeArrayPredicate(matchValueType);
+      return arrayPredicate.apply(thatValue.asArray());
+    } else {
+      return Objects.equals(matchValue, that.matchValue);
+    }
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(column, matchValueType, matchValue, extractionFn, 
filterTuning);
+  }
+
+  @Override
+  public RangeSet<String> getDimensionRangeSet(String dimension)
+  {
+    if (!Objects.equals(getColumn(), dimension) || getExtractionFn() != null) {
+      return null;
+    }
+    RangeSet<String> retSet = TreeRangeSet.create();
+    retSet.add(Range.singleton(String.valueOf(matchValue)));
+    return retSet;
+  }
+
+  @Nullable
+  @Override
+  public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
+  {
+    if (!Filters.checkFilterTuningUseIndex(column, selector, filterTuning)) {
+      return null;
+    }
+
+    final ColumnIndexSupplier indexSupplier = 
selector.getIndexSupplier(column);
+    if (indexSupplier == null) {
+      return Filters.makeNullIndex(false, selector);
+    }
+
+    final StringValueSetIndex valueSetIndex = 
indexSupplier.as(StringValueSetIndex.class);
+    if (valueSetIndex == null) {
+      // column exists, but has no index
+      return null;
+    }
+    return valueSetIndex.forValue(String.valueOf(matchValue));
+  }
+
+  @Override
+  public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
+  {
+    return ColumnProcessors.makeProcessor(
+        column,
+        new TypedConstantValueMatcherFactory(matchValue, matchValueType),
+        factory
+    );
+  }
+
+  @Override
+  public VectorValueMatcher makeVectorMatcher(VectorColumnSelectorFactory 
factory)
+  {
+    final ColumnCapabilities capabilities = 
factory.getColumnCapabilities(column);
+
+    if (matchValueType.isPrimitive() && (capabilities == null || 
capabilities.isPrimitive())) {
+      return ColumnProcessors.makeVectorProcessor(
+          column,
+          VectorValueMatcherColumnProcessorFactory.instance(),
+          factory
+      ).makeMatcher(matchValue, matchValueType);
+    }
+    return ColumnProcessors.makeVectorProcessor(
+        column,
+        VectorValueMatcherColumnProcessorFactory.instance(),
+        factory
+    ).makeMatcher(new EqualityPredicateFactory(matchValue, matchValueType));
+  }
+
+  @Override
+  public boolean supportsSelectivityEstimation(ColumnSelector columnSelector, 
ColumnIndexSelector indexSelector)
+  {
+    return Filters.supportsSelectivityEstimation(this, column, columnSelector, 
indexSelector);
+  }
+
+  @Override
+  public boolean canVectorizeMatcher(ColumnInspector inspector)
+  {
+    return true;
+  }
+
+  @Override
+  public Set<String> getRequiredColumns()
+  {
+    return ImmutableSet.of(column);
+  }
+
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(column);
+
+    if (rewriteDimensionTo == null) {
+      throw new IAE(
+          "Received a non-applicable rewrite: %s, filter's dimension: %s",
+          columnRewrites,
+          columnRewrites
+      );
+    }
+
+    return new EqualityFilter(
+        rewriteDimensionTo,
+        matchValueType,
+        matchValue,
+        extractionFn,
+        filterTuning
+    );
+  }
+
+  private static class EqualityPredicateFactory implements 
DruidPredicateFactory
+  {
+    private final ExprEval<?> matchValue;
+    private final ColumnType matchValueType;
+
+    private final Object initLock = new Object();
+
+    private volatile DruidLongPredicate longPredicate;
+    private volatile DruidFloatPredicate floatPredicate;
+    private volatile DruidDoublePredicate doublePredicate;
+
+    public EqualityPredicateFactory(Object matchValue, ColumnType 
matchValueType)
+    {
+      this.matchValue = 
ExprEval.ofType(ExpressionType.fromColumnType(matchValueType), matchValue);
+      this.matchValueType = matchValueType;
+    }
+
+    @Override
+    public Predicate<String> makeStringPredicate()
+    {
+      return 
Predicates.equalTo(matchValue.castTo(ExpressionType.STRING).asString());
+    }
+
+    @Override
+    public DruidLongPredicate makeLongPredicate()
+    {
+      initLongPredicate();
+      return longPredicate;
+    }
+
+    @Override
+    public DruidFloatPredicate makeFloatPredicate()
+    {
+      initFloatPredicate();
+      return floatPredicate;
+    }
+
+    @Override
+    public DruidDoublePredicate makeDoublePredicate()
+    {
+      initDoublePredicate();
+      return doublePredicate;
+    }
+
+    @Override
+    public Predicate<Object[]> makeArrayPredicate(@Nullable 
TypeSignature<ValueType> arrayType)
+    {
+      if (arrayType != null) {
+        final Comparator<Object[]> arrayComparator = 
arrayType.getNullableStrategy();
+        final Object[] matchArray = 
matchValue.castTo(ExpressionType.fromColumnType(arrayType)).asArray();
+        return input -> arrayComparator.compare(input, matchArray) == 0;
+      } else {
+        // fall back to per row detection if input array type is unknown
+        return input -> {
+          final ExprEval<?> eval = ExprEval.bestEffortOf(input);
+          final Comparator<Object[]> arrayComparator = 
arrayType.getNullableStrategy();
+          final Object[] matchArray = matchValue.castTo(eval.type()).asArray();
+          return arrayComparator.compare(input, matchArray) == 0;
+        };
+      }
+    }
+
+    @Override
+    public Predicate<Object> makeObjectPredicate()
+    {
+      if (matchValueType.equals(ColumnType.NESTED_DATA)) {
+        return input -> Objects.equals(StructuredData.unwrap(input), 
StructuredData.unwrap(matchValue.value()));
+      }
+      return Predicates.equalTo(matchValue.valueOrDefault());
+    }
+
+    private void initLongPredicate()
+    {
+      if (longPredicate != null) {
+        return;
+      }
+      synchronized (initLock) {
+        if (longPredicate != null) {
+          return;
+        }
+        if (matchValue == null) {
+          longPredicate = DruidLongPredicate.MATCH_NULL_ONLY;
+          return;
+        }

Review Comment:
   Isn't this impossible?



##########
extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java:
##########
@@ -165,6 +169,23 @@ public boolean applyNull()
               }
             };
           }
+
+          @Override
+          public Predicate<Object[]> makeArrayPredicate(@Nullable 
TypeSignature<ValueType> arrayType)
+          {
+            final ExpressionType expressionType = arrayType == null || 
!arrayType.isArray()

Review Comment:
   I would hope that in `makeArrayPredicate` we were already past needing to 
validate that the type passed in is an `arrayType.isArray()`...  It would be 
nice if there was some class specialization or something that could be done on 
the actual method signature which would help us in just assuming that it's an 
array type...
   
   That or the code could just assume it?



##########
extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java:
##########
@@ -78,6 +85,8 @@ public class SketchAggregationTest
   @Rule
   public final TemporaryFolder tempFolder = new TemporaryFolder();
 
+  private final Closer closer;

Review Comment:
   Hrm, I'm seeing the same closer pattern here too and it looks like it's 
building the same segments.  Q for you:
   
   What do you think about, e.g. building the segment actually as a thing on 
`NestedDataUtilsTest` and using a reference-counting pattern to determine when 
it gets cleaned up.  That is, we statically have the test class do something 
like `NestedDataUtilsTest.getArraysSegmentReference()` and store the return 
object from that.  There's then an `@AfterClass` which counts down the 
reference from `NestedDataUtilsTest`.  In a "normal" static initialization, 
this would cause all of the tests that use the same segment to have references 
"checked out" from normal static initialization and then once the last one 
completes its `@AfterClass` that's when the cleanup would finally occur.



##########
indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java:
##########
@@ -683,6 +684,117 @@ public void close()
     Assert.assertTrue("Sequence is not closed", 
isSequenceClosed.booleanValue());
   }
 
+  @Test
+  public void testArrayColumns() throws IOException
+  {
+    // Write a segment with two rows in it, with columns: s (string), d 
(double), cnt (long), met_s (complex).

Review Comment:
   This comment doesn't really align with the code that I see below it.  The 
names of things seem stale?  Is it copy pasta from somewhere?



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java:
##########
@@ -41,6 +42,9 @@ public static void updateSketch(final HllSketch sketch, final 
StringEncoding str
       sketch.update(((Number) value).doubleValue());
     } else if (value instanceof String) {
       updateSketchWithString(sketch, stringEncoding, (String) value);
+    } else if (value instanceof Object[]) {
+      byte[] arrayBytes = ExprEval.toBytesBestEffort(value);
+      sketch.update(arrayBytes);

Review Comment:
   Seeing this side-by-side with the `instanceof List` code can produce a bit 
of a head-scratch in terms of why an `Object[]` and a `List` are treated 
differently when they are logically the same thing.  This is likely deserving 
of a comment that explains the expectations of why we get an Object array and 
why we get a List and what the differences are.



##########
extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java:
##########
@@ -91,6 +93,12 @@ public class BloomFilterAggregatorTest extends 
InitializedNullHandlingTest
   private static final Float[] FLOAT_VALUES1 = new Float[]{0.4f, 0.8f, 23.2f};
   private static final Long[] LONG_VALUES1 = new Long[]{10241L, 12312355L, 0L, 
81L};
 
+  private static final Object[] ARRAY_VALUES = new Object[]{
+      new Object[]{1L, 2L},
+      new Object[]{3L, 4L},
+      new Object[]{0L, 1000L}
+  };

Review Comment:
   Not sure how important it is, but I notice that you are not validating 
`null` or `new Object[]{1L, null}` and making sure that it handles null as 
expected.



##########
processing/src/main/java/org/apache/druid/query/filter/BoundDimFilter.java:
##########
@@ -610,13 +610,14 @@ private Supplier<DruidDoublePredicate> 
makeDoublePredicateSupplier()
     return Suppliers.memoize(doublePredicate);
   }
 
-  private static DruidLongPredicate makeLongPredicateFromBounds(
+  static DruidLongPredicate makeLongPredicateFromBounds(

Review Comment:
   Is there a reason it cannot just be public?  You had reason to make it not 
private, shouldn't really hurt to take it fully public...



##########
extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java:
##########
@@ -98,6 +110,12 @@ public static Collection<?> constructorFeeder()
     return constructors;
   }
 
+  @After
+  public void teardown() throws IOException
+  {
+    closer.close();
+  }
+

Review Comment:
   This closer is used to unmap a segment, such behaviors tend to be relatively 
expensive and I'm scared that the pattern in this test will lead to more tests 
that build and kill the same segment multiple times, which has been a common 
pattern that slows down our tests.
   
   Would it make sense to have these be at `BeforeClass` and `AfterClass` and 
build the segment once such that it could be reused by all of the tests on the 
class?  For this one case, that would amount to reuse for all of the different 
parameters of the test, which I think would be a good thing.



##########
processing/src/main/java/org/apache/druid/query/filter/DruidPredicateFactory.java:
##########
@@ -33,6 +38,11 @@ public interface DruidPredicateFactory
 
   DruidDoublePredicate makeDoublePredicate();
 
+  default Predicate<Object[]> makeArrayPredicate(@Nullable 
TypeSignature<ValueType> inputType)
+  {
+    throw new UOE("Predicate does not support ARRAY types");

Review Comment:
   would be nice to do a `getClass()` or `toString()` or something here as the 
default method will just show up as the interface in the stack trace, not the 
concrete implementation.
   
   Also, for these UOE, you can consider `DruidException.defensive()` or, if 
you are thinking that these should make it to an end-user, you could do 
something for the USER persona...



##########
processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.TypeSignature;
+import org.apache.druid.segment.column.TypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.DimensionPredicateFilter;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.filter.PredicateValueMatcherFactory;
+import org.apache.druid.segment.filter.ValueMatchers;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.StringValueSetIndex;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class EqualityFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  private final Object matchValue;
+  @Nullable
+  private final ExtractionFn extractionFn;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final DruidPredicateFactory predicateFactory;
+
+  @JsonCreator
+  public EqualityFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("matchValue") Object matchValue,
+      @JsonProperty("extractionFn") @Nullable ExtractionFn extractionFn,

Review Comment:
   We talked about this a bit directly, but I find it unfortunate that this is 
bringing `ExtractionFn` forward.  I think it would be beneficial to not 
implement `ExtractionFn` as an option on these and require the SQL planning to 
go about meeting its needs in other ways.



##########
processing/src/main/java/org/apache/druid/math/expr/ExprEval.java:
##########
@@ -142,6 +142,26 @@ public static void serialize(ByteBuffer buffer, int 
position, ExpressionType typ
     }
   }
 
+  public static byte[] toBytes(ExpressionType expressionType, 
NullableTypeStrategy<Object> strategy, Object o)
+  {
+    // convert the array to byte[] form so that we take a hash of the whole 
array
+    final ExprEval<?> eval = ExprEval.ofType(expressionType, o);
+    final int size = strategy.estimateSizeBytes(eval.valueOrDefault());
+    final ByteBuffer buffer = ByteBuffer.allocate(size);
+    strategy.write(buffer, eval.valueOrDefault(), size);
+    return buffer.array();
+  }
+
+  public static byte[] toBytesBestEffort(Object o)
+  {
+    final ExprEval<?> eval = ExprEval.bestEffortOf(o);
+    final NullableTypeStrategy<Object> strategy = 
eval.type().getNullableStrategy();
+    final int size = strategy.estimateSizeBytes(eval.valueOrDefault());
+    final ByteBuffer buffer = ByteBuffer.allocate(size);
+    strategy.write(buffer, eval.valueOrDefault(), size);
+    return buffer.array();

Review Comment:
   I find myself feeling like these 4 lines could be reused by the other 
`toBytes()` as well.



##########
processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.TypeSignature;
+import org.apache.druid.segment.column.TypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.DimensionPredicateFilter;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.filter.PredicateValueMatcherFactory;
+import org.apache.druid.segment.filter.ValueMatchers;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.StringValueSetIndex;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class EqualityFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  private final Object matchValue;
+  @Nullable
+  private final ExtractionFn extractionFn;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final DruidPredicateFactory predicateFactory;
+
+  @JsonCreator
+  public EqualityFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("matchValue") Object matchValue,
+      @JsonProperty("extractionFn") @Nullable ExtractionFn extractionFn,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (column == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter, column cannot be 
null");

Review Comment:
   `InvalidInput.exception()` is likely your friend.



##########
processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.TypeSignature;
+import org.apache.druid.segment.column.TypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.DimensionPredicateFilter;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.filter.PredicateValueMatcherFactory;
+import org.apache.druid.segment.filter.ValueMatchers;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.StringValueSetIndex;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class EqualityFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  private final Object matchValue;
+  @Nullable
+  private final ExtractionFn extractionFn;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final DruidPredicateFactory predicateFactory;
+
+  @JsonCreator
+  public EqualityFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("matchValue") Object matchValue,
+      @JsonProperty("extractionFn") @Nullable ExtractionFn extractionFn,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (column == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter, column cannot be 
null");
+    }
+    this.column = column;
+    if (matchValueType == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    this.matchValueType = matchValueType;
+    if (matchValue == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter on column [%s], 
matchValue cannot be null", column);
+    }
+    this.matchValue = matchValue;
+    this.extractionFn = extractionFn;
+    this.filterTuning = filterTuning;
+    this.predicateFactory = new EqualityPredicateFactory(matchValue, 
matchValueType);
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    final TypeStrategy<Object> typeStrategy = matchValueType.getStrategy();
+    final int size = typeStrategy.estimateSizeBytes(matchValue);
+    final ByteBuffer valueBuffer = ByteBuffer.allocate(size);
+    typeStrategy.write(valueBuffer, matchValue, size);
+    return new CacheKeyBuilder(DimFilterUtils.EQUALS_CACHE_ID)
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendString(column)
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendString(matchValueType.asTypeString())
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendByteArray(valueBuffer.array())
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendByteArray(extractionFn == null ? new byte[0] : 
extractionFn.getCacheKey())
+        .build();
+  }
+
+  @Override
+  public DimFilter optimize()
+  {
+    return this;
+  }
+
+  @Override
+  public Filter toFilter()
+  {
+    if (extractionFn == null) {
+      return this;
+    } else {
+      return new DimensionPredicateFilter(column, predicateFactory, 
extractionFn, filterTuning);
+    }
+  }
+
+  @JsonProperty
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @JsonProperty
+  public ColumnType getMatchValueType()
+  {
+    return matchValueType;
+  }
+
+  @JsonProperty
+  public Object getMatchValue()
+  {
+    return matchValue;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public ExtractionFn getExtractionFn()
+  {
+    return extractionFn;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public FilterTuning getFilterTuning()
+  {
+    return filterTuning;
+  }
+
+  @Override
+  public String toString()
+  {
+    DimFilter.DimFilterToStringBuilder bob = new 
DimFilter.DimFilterToStringBuilder().appendDimension(
+                                                                               
          column,
+                                                                               
          extractionFn
+                                                                               
      )
+                                                                               
      .append(" = ")
+                                                                               
      .append(matchValue);
+
+    if (!ColumnType.STRING.equals(matchValueType)) {
+      bob.append(" (" + matchValueType.asTypeString() + ")");
+    }
+    return bob.appendFilterTuning(filterTuning).build();
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    EqualityFilter that = (EqualityFilter) o;
+    if (!column.equals(that.column)) {
+      return false;
+    }
+    if (!Objects.equals(matchValueType, that.matchValueType)) {
+      return false;
+    }
+    if (!Objects.equals(extractionFn, that.extractionFn)) {
+      return false;
+    }
+    if (!Objects.equals(filterTuning, that.filterTuning)) {
+      return false;
+    }
+    if (matchValueType.isArray()) {
+      // just use predicate to see if the values are the same
+      final ExprEval<?> thatValue = ExprEval.ofType(
+          ExpressionType.fromColumnType(that.matchValueType),
+          that.matchValue
+      );
+      final Predicate<Object[]> arrayPredicate = 
predicateFactory.makeArrayPredicate(matchValueType);
+      return arrayPredicate.apply(thatValue.asArray());
+    } else {
+      return Objects.equals(matchValue, that.matchValue);
+    }
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(column, matchValueType, matchValue, extractionFn, 
filterTuning);
+  }
+
+  @Override
+  public RangeSet<String> getDimensionRangeSet(String dimension)
+  {
+    if (!Objects.equals(getColumn(), dimension) || getExtractionFn() != null) {
+      return null;
+    }
+    RangeSet<String> retSet = TreeRangeSet.create();
+    retSet.add(Range.singleton(String.valueOf(matchValue)));
+    return retSet;
+  }
+
+  @Nullable
+  @Override
+  public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
+  {
+    if (!Filters.checkFilterTuningUseIndex(column, selector, filterTuning)) {
+      return null;
+    }
+
+    final ColumnIndexSupplier indexSupplier = 
selector.getIndexSupplier(column);
+    if (indexSupplier == null) {
+      return Filters.makeNullIndex(false, selector);
+    }
+
+    final StringValueSetIndex valueSetIndex = 
indexSupplier.as(StringValueSetIndex.class);
+    if (valueSetIndex == null) {
+      // column exists, but has no index
+      return null;
+    }
+    return valueSetIndex.forValue(String.valueOf(matchValue));

Review Comment:
   I had expected that this would create an `ObjectValueSetIndex` or something 
that would enable us to just pass in the object and get indexes.  Why not do 
that?



##########
processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.vector.VectorValueMatcher;
+import 
org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.TypeSignature;
+import org.apache.druid.segment.column.TypeStrategy;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.DimensionPredicateFilter;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.filter.PredicateValueMatcherFactory;
+import org.apache.druid.segment.filter.ValueMatchers;
+import org.apache.druid.segment.index.BitmapColumnIndex;
+import org.apache.druid.segment.index.semantic.StringValueSetIndex;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class EqualityFilter extends AbstractOptimizableDimFilter implements 
Filter
+{
+  private final String column;
+  private final ColumnType matchValueType;
+  private final Object matchValue;
+  @Nullable
+  private final ExtractionFn extractionFn;
+  @Nullable
+  private final FilterTuning filterTuning;
+  private final DruidPredicateFactory predicateFactory;
+
+  @JsonCreator
+  public EqualityFilter(
+      @JsonProperty("column") String column,
+      @JsonProperty("matchValueType") ColumnType matchValueType,
+      @JsonProperty("matchValue") Object matchValue,
+      @JsonProperty("extractionFn") @Nullable ExtractionFn extractionFn,
+      @JsonProperty("filterTuning") @Nullable FilterTuning filterTuning
+  )
+  {
+    if (column == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter, column cannot be 
null");
+    }
+    this.column = column;
+    if (matchValueType == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter on column [%s], 
matchValueType cannot be null", column);
+    }
+    this.matchValueType = matchValueType;
+    if (matchValue == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid equality filter on column [%s], 
matchValue cannot be null", column);
+    }
+    this.matchValue = matchValue;
+    this.extractionFn = extractionFn;
+    this.filterTuning = filterTuning;
+    this.predicateFactory = new EqualityPredicateFactory(matchValue, 
matchValueType);
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    final TypeStrategy<Object> typeStrategy = matchValueType.getStrategy();
+    final int size = typeStrategy.estimateSizeBytes(matchValue);
+    final ByteBuffer valueBuffer = ByteBuffer.allocate(size);
+    typeStrategy.write(valueBuffer, matchValue, size);
+    return new CacheKeyBuilder(DimFilterUtils.EQUALS_CACHE_ID)
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendString(column)
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendString(matchValueType.asTypeString())
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendByteArray(valueBuffer.array())
+        .appendByte(DimFilterUtils.STRING_SEPARATOR)
+        .appendByteArray(extractionFn == null ? new byte[0] : 
extractionFn.getCacheKey())
+        .build();
+  }
+
+  @Override
+  public DimFilter optimize()
+  {
+    return this;
+  }
+
+  @Override
+  public Filter toFilter()
+  {
+    if (extractionFn == null) {
+      return this;
+    } else {
+      return new DimensionPredicateFilter(column, predicateFactory, 
extractionFn, filterTuning);
+    }
+  }
+
+  @JsonProperty
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @JsonProperty
+  public ColumnType getMatchValueType()
+  {
+    return matchValueType;
+  }
+
+  @JsonProperty
+  public Object getMatchValue()
+  {
+    return matchValue;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public ExtractionFn getExtractionFn()
+  {
+    return extractionFn;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public FilterTuning getFilterTuning()
+  {
+    return filterTuning;
+  }
+
+  @Override
+  public String toString()
+  {
+    DimFilter.DimFilterToStringBuilder bob = new 
DimFilter.DimFilterToStringBuilder().appendDimension(
+                                                                               
          column,
+                                                                               
          extractionFn
+                                                                               
      )
+                                                                               
      .append(" = ")
+                                                                               
      .append(matchValue);
+
+    if (!ColumnType.STRING.equals(matchValueType)) {
+      bob.append(" (" + matchValueType.asTypeString() + ")");
+    }
+    return bob.appendFilterTuning(filterTuning).build();
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    EqualityFilter that = (EqualityFilter) o;
+    if (!column.equals(that.column)) {
+      return false;
+    }
+    if (!Objects.equals(matchValueType, that.matchValueType)) {
+      return false;
+    }
+    if (!Objects.equals(extractionFn, that.extractionFn)) {
+      return false;
+    }
+    if (!Objects.equals(filterTuning, that.filterTuning)) {
+      return false;
+    }
+    if (matchValueType.isArray()) {
+      // just use predicate to see if the values are the same
+      final ExprEval<?> thatValue = ExprEval.ofType(
+          ExpressionType.fromColumnType(that.matchValueType),
+          that.matchValue
+      );
+      final Predicate<Object[]> arrayPredicate = 
predicateFactory.makeArrayPredicate(matchValueType);
+      return arrayPredicate.apply(thatValue.asArray());
+    } else {
+      return Objects.equals(matchValue, that.matchValue);
+    }
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(column, matchValueType, matchValue, extractionFn, 
filterTuning);
+  }
+
+  @Override
+  public RangeSet<String> getDimensionRangeSet(String dimension)
+  {
+    if (!Objects.equals(getColumn(), dimension) || getExtractionFn() != null) {
+      return null;
+    }
+    RangeSet<String> retSet = TreeRangeSet.create();
+    retSet.add(Range.singleton(String.valueOf(matchValue)));
+    return retSet;
+  }
+
+  @Nullable
+  @Override
+  public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
+  {
+    if (!Filters.checkFilterTuningUseIndex(column, selector, filterTuning)) {
+      return null;
+    }
+
+    final ColumnIndexSupplier indexSupplier = 
selector.getIndexSupplier(column);
+    if (indexSupplier == null) {
+      return Filters.makeNullIndex(false, selector);
+    }
+
+    final StringValueSetIndex valueSetIndex = 
indexSupplier.as(StringValueSetIndex.class);
+    if (valueSetIndex == null) {
+      // column exists, but has no index
+      return null;
+    }
+    return valueSetIndex.forValue(String.valueOf(matchValue));
+  }
+
+  @Override
+  public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
+  {
+    return ColumnProcessors.makeProcessor(
+        column,
+        new TypedConstantValueMatcherFactory(matchValue, matchValueType),
+        factory
+    );
+  }
+
+  @Override
+  public VectorValueMatcher makeVectorMatcher(VectorColumnSelectorFactory 
factory)
+  {
+    final ColumnCapabilities capabilities = 
factory.getColumnCapabilities(column);
+
+    if (matchValueType.isPrimitive() && (capabilities == null || 
capabilities.isPrimitive())) {
+      return ColumnProcessors.makeVectorProcessor(
+          column,
+          VectorValueMatcherColumnProcessorFactory.instance(),
+          factory
+      ).makeMatcher(matchValue, matchValueType);
+    }
+    return ColumnProcessors.makeVectorProcessor(
+        column,
+        VectorValueMatcherColumnProcessorFactory.instance(),
+        factory
+    ).makeMatcher(new EqualityPredicateFactory(matchValue, matchValueType));
+  }
+
+  @Override
+  public boolean supportsSelectivityEstimation(ColumnSelector columnSelector, 
ColumnIndexSelector indexSelector)
+  {
+    return Filters.supportsSelectivityEstimation(this, column, columnSelector, 
indexSelector);
+  }
+
+  @Override
+  public boolean canVectorizeMatcher(ColumnInspector inspector)
+  {
+    return true;
+  }
+
+  @Override
+  public Set<String> getRequiredColumns()
+  {
+    return ImmutableSet.of(column);
+  }
+
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(column);
+
+    if (rewriteDimensionTo == null) {
+      throw new IAE(
+          "Received a non-applicable rewrite: %s, filter's dimension: %s",
+          columnRewrites,
+          columnRewrites
+      );
+    }
+
+    return new EqualityFilter(
+        rewriteDimensionTo,
+        matchValueType,
+        matchValue,
+        extractionFn,
+        filterTuning
+    );
+  }
+
+  private static class EqualityPredicateFactory implements 
DruidPredicateFactory
+  {
+    private final ExprEval<?> matchValue;
+    private final ColumnType matchValueType;
+
+    private final Object initLock = new Object();
+
+    private volatile DruidLongPredicate longPredicate;
+    private volatile DruidFloatPredicate floatPredicate;
+    private volatile DruidDoublePredicate doublePredicate;
+
+    public EqualityPredicateFactory(Object matchValue, ColumnType 
matchValueType)
+    {
+      this.matchValue = 
ExprEval.ofType(ExpressionType.fromColumnType(matchValueType), matchValue);
+      this.matchValueType = matchValueType;

Review Comment:
   I'm a little concerned that this code is trying to force materialize 
something that it doesn't need to force materialize.  I'm a bit confused as to 
how this would work with something like a sketch or other complex object where 
we are passing in an object that the column would know how to deal with, but 
the expression evaluation system doesn't.  I'm scared that we will have to 
teach expressions about how to deal with all of the types when we've already 
taught the column how to deal with them and all we really want is the Object to 
be passed through un-changed such that the column can do the right thing.
   
   So, all of that said, to say, if I'm a column implementation of a relatively 
complex object (let's say HllSketch) and I want to support this Equality 
operator (I want someone to be able to pass in the base64 encoded string of the 
HllSketch and I want to find the rows that match it), how can I do that?  Or do 
I need to create my own "HllSketchFilter" and use that instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to