This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e092cc  add MV_FILTER_ONLY, MV_FILTER_NONE, ListFilteredVirtualColumn 
(#11650)
5e092cc is described below

commit 5e092ccb9b9bd6f44b32637ecfd8d5317a542570
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Sep 16 09:31:53 2021 -0700

    add MV_FILTER_ONLY, MV_FILTER_NONE, ListFilteredVirtualColumn (#11650)
    
    * add MV_FILTER_ONLY SQL function, and list filter virtual column
    
    * MV_FILTER_NONE and more tests
    
    * formatting
    
    * o yeah, forgot can do easy thing
    
    * style
    
    * hmm why was that there
    
    * test filtering on virtual column
    
    * style
    
    * meh
    
    * do it right
    
    * good bot
---
 docs/querying/sql.md                               |   2 +
 .../ForwardingFilteredDimensionSelector.java       |  30 +-
 .../query/dimension/ListFilteredDimensionSpec.java |  90 +++---
 .../dimension/PrefixFilteredDimensionSpec.java     |  28 +-
 .../dimension/RegexFilteredDimensionSpec.java      |  28 +-
 .../java/org/apache/druid/segment/IdMapping.java   | 158 ++++++++++
 .../org/apache/druid/segment/VirtualColumn.java    |   1 +
 .../apache/druid/segment/data/GenericIndexed.java  |  21 +-
 .../org/apache/druid/segment/data/Indexed.java     |  42 +++
 .../segment/virtual/ListFilteredVirtualColumn.java | 272 +++++++++++++++++
 .../segment/virtual/VirtualColumnCacheHelper.java  |   1 +
 .../org/apache/druid/segment/IdMappingTest.java    |  96 ++++++
 .../segment/virtual/DummyStringVirtualColumn.java  |   2 +-
 .../ListFilteredVirtualColumnSelectorTest.java     | 286 ++++++++++++++++++
 .../virtual/ListFilteredVirtualColumnTest.java     |  69 +++++
 .../sql/calcite/expression/DruidExpression.java    |  34 ++-
 .../MultiValueStringOperatorConversions.java       | 135 +++++++++
 .../sql/calcite/planner/DruidOperatorTable.java    |   2 +
 .../calcite/CalciteMultiValueStringQueryTest.java  | 326 +++++++++++++++++++++
 19 files changed, 1498 insertions(+), 125 deletions(-)

diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index 18d9e6e..572af2d 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -613,6 +613,8 @@ All 'array' references in the multi-value string function 
documentation can refe
 |Function|Notes|
 |--------|-----|
 | `ARRAY[expr1,expr ...]` | constructs a SQL ARRAY literal from the expression 
arguments, using the type of the first argument as the output array type |
+| `MV_FILTER_ONLY(expr, arr)` | filters multi-value `expr` to include only 
values contained in array `arr` |
+| `MV_FILTER_NONE(expr, arr)` | filters multi-value `expr` to include no 
values contained in array `arr` |
 | `MV_LENGTH(arr)` | returns length of array expression |
 | `MV_OFFSET(arr,long)` | returns the array element at the 0 based index 
supplied, or null for an out of range index|
 | `MV_ORDINAL(arr,long)` | returns the array element at the 1 based index 
supplied, or null for an out of range index |
diff --git 
a/processing/src/main/java/org/apache/druid/query/dimension/ForwardingFilteredDimensionSelector.java
 
b/processing/src/main/java/org/apache/druid/query/dimension/ForwardingFilteredDimensionSelector.java
index 3213f57..130ddb5 100644
--- 
a/processing/src/main/java/org/apache/druid/query/dimension/ForwardingFilteredDimensionSelector.java
+++ 
b/processing/src/main/java/org/apache/druid/query/dimension/ForwardingFilteredDimensionSelector.java
@@ -22,7 +22,6 @@ package org.apache.druid.query.dimension;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
-import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.query.filter.ValueMatcher;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@@ -30,6 +29,7 @@ import org.apache.druid.segment.AbstractDimensionSelector;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.DimensionSelectorUtils;
 import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.IdMapping;
 import org.apache.druid.segment.data.ArrayBasedIndexedInts;
 import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.filter.BooleanValueMatcher;
@@ -41,18 +41,16 @@ final class ForwardingFilteredDimensionSelector extends 
AbstractDimensionSelecto
 {
   private final DimensionSelector selector;
   private final IdLookup baseIdLookup;
-  private final Int2IntOpenHashMap forwardMapping;
-  private final int[] reverseMapping;
+  private final IdMapping idMapping;
   private final ArrayBasedIndexedInts row = new ArrayBasedIndexedInts();
 
   /**
    * @param selector must return true from {@link 
DimensionSelector#nameLookupPossibleInAdvance()}
-   * @param forwardMapping must have {@link 
Int2IntOpenHashMap#defaultReturnValue(int)} configured to -1.
+   * @param idMapping must have be initialized and populated with the 
dictionary id mapping.
    */
-  ForwardingFilteredDimensionSelector(
+  public ForwardingFilteredDimensionSelector(
       DimensionSelector selector,
-      Int2IntOpenHashMap forwardMapping,
-      int[] reverseMapping
+      IdMapping idMapping
   )
   {
     this.selector = Preconditions.checkNotNull(selector);
@@ -60,11 +58,7 @@ final class ForwardingFilteredDimensionSelector extends 
AbstractDimensionSelecto
       throw new IAE("selector.nameLookupPossibleInAdvance() should return 
true");
     }
     this.baseIdLookup = selector.idLookup();
-    this.forwardMapping = Preconditions.checkNotNull(forwardMapping);
-    if (forwardMapping.defaultReturnValue() != -1) {
-      throw new IAE("forwardMapping.defaultReturnValue() should be -1");
-    }
-    this.reverseMapping = Preconditions.checkNotNull(reverseMapping);
+    this.idMapping = Preconditions.checkNotNull(idMapping);
   }
 
   @Override
@@ -75,7 +69,7 @@ final class ForwardingFilteredDimensionSelector extends 
AbstractDimensionSelecto
     row.ensureSize(baseRowSize);
     int resultSize = 0;
     for (int i = 0; i < baseRowSize; i++) {
-      int forwardedValue = forwardMapping.get(baseRow.get(i));
+      int forwardedValue = idMapping.getForwardedId(baseRow.get(i));
       if (forwardedValue >= 0) {
         row.setValue(resultSize, forwardedValue);
         resultSize++;
@@ -101,7 +95,7 @@ final class ForwardingFilteredDimensionSelector extends 
AbstractDimensionSelecto
             final int baseRowSize = baseRow.size();
             boolean nullRow = true;
             for (int i = 0; i < baseRowSize; i++) {
-              int forwardedValue = forwardMapping.get(baseRow.get(i));
+              int forwardedValue = idMapping.getForwardedId(baseRow.get(i));
               if (forwardedValue >= 0) {
                 // Make the following check after the `forwardedValue >= 0` 
check, because if forwardedValue is -1 and
                 // valueId is -1, we don't want to return true from matches().
@@ -144,7 +138,7 @@ final class ForwardingFilteredDimensionSelector extends 
AbstractDimensionSelecto
         final int baseRowSize = baseRow.size();
         boolean nullRow = true;
         for (int i = 0; i < baseRowSize; ++i) {
-          int forwardedValue = forwardMapping.get(baseRow.get(i));
+          int forwardedValue = idMapping.getForwardedId(baseRow.get(i));
           if (forwardedValue >= 0) {
             if (valueIds.get(forwardedValue)) {
               return true;
@@ -167,13 +161,13 @@ final class ForwardingFilteredDimensionSelector extends 
AbstractDimensionSelecto
   @Override
   public int getValueCardinality()
   {
-    return forwardMapping.size();
+    return idMapping.getValueCardinality();
   }
 
   @Override
   public String lookupName(int id)
   {
-    return selector.lookupName(reverseMapping[id]);
+    return selector.lookupName(idMapping.getReverseId(id));
   }
 
   @Override
@@ -192,7 +186,7 @@ final class ForwardingFilteredDimensionSelector extends 
AbstractDimensionSelecto
   @Override
   public int lookupId(String name)
   {
-    return forwardMapping.get(baseIdLookup.lookupId(name));
+    return idMapping.getForwardedId(baseIdLookup.lookupId(name));
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
 
b/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
index 9c2f6a1..145d3df 100644
--- 
a/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
@@ -22,13 +22,15 @@ package org.apache.druid.query.dimension;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
-import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.filter.DimFilterUtils;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.IdMapping;
+import org.apache.druid.segment.data.Indexed;
 
+import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.Set;
@@ -78,64 +80,80 @@ public class ListFilteredDimensionSpec extends 
BaseFilteredDimensionSpec
     }
 
     if (isWhitelist) {
-      return filterWhiteList(selector);
+      return filterAllowList(values, selector);
     } else {
-      return filterBlackList(selector);
+      return filterDenyList(values, selector);
     }
   }
 
-  private DimensionSelector filterWhiteList(DimensionSelector selector)
+  public static IdMapping buildAllowListIdMapping(
+      Set<String> values,
+      int cardinality,
+      @Nullable IdLookup idLookup,
+      Indexed.IndexedGetter<String> fn
+  )
   {
-    final int selectorCardinality = selector.getValueCardinality();
-    if (selectorCardinality < 0 || !selector.nameLookupPossibleInAdvance()) {
-      return new PredicateFilteredDimensionSelector(selector, 
Predicates.in(values));
-    }
-    final int maxPossibleFilteredCardinality = values.size();
-    int count = 0;
-    final Int2IntOpenHashMap forwardMapping = new 
Int2IntOpenHashMap(maxPossibleFilteredCardinality);
-    forwardMapping.defaultReturnValue(-1);
-    final int[] reverseMapping = new int[maxPossibleFilteredCardinality];
-    IdLookup idLookup = selector.idLookup();
+    final IdMapping.Builder builder = 
IdMapping.Builder.ofCardinality(values.size());
     if (idLookup != null) {
       for (String value : values) {
         int i = idLookup.lookupId(value);
         if (i >= 0) {
-          forwardMapping.put(i, count);
-          reverseMapping[count++] = i;
+          builder.addMapping(i);
         }
       }
     } else {
-      for (int i = 0; i < selectorCardinality; i++) {
-        if 
(values.contains(NullHandling.nullToEmptyIfNeeded(selector.lookupName(i)))) {
-          forwardMapping.put(i, count);
-          reverseMapping[count++] = i;
+      for (int i = 0; i < cardinality; i++) {
+        if (values.contains(NullHandling.nullToEmptyIfNeeded(fn.get(i)))) {
+          builder.addMapping(i);
         }
       }
     }
-    return new ForwardingFilteredDimensionSelector(selector, forwardMapping, 
reverseMapping);
+    return builder.build();
+  }
+
+  public static IdMapping buildDenyListIdMapping(
+      Set<String> values,
+      int cardinality,
+      Indexed.IndexedGetter<String> fn
+  )
+  {
+    final IdMapping.Builder builder = 
IdMapping.Builder.ofCardinality(cardinality);
+    for (int i = 0; i < cardinality; i++) {
+      if (!values.contains(NullHandling.nullToEmptyIfNeeded(fn.get(i)))) {
+        builder.addMapping(i);
+      }
+    }
+    return builder.build();
+  }
+
+  public static DimensionSelector filterAllowList(Set<String> values, 
DimensionSelector selector)
+  {
+    if (selector.getValueCardinality() < 0 || 
!selector.nameLookupPossibleInAdvance()) {
+      return new PredicateFilteredDimensionSelector(selector, 
Predicates.in(values));
+    }
+    final IdMapping idMapping = buildAllowListIdMapping(
+        values,
+        selector.getValueCardinality(),
+        selector.idLookup(),
+        selector::lookupName
+    );
+    return new ForwardingFilteredDimensionSelector(selector, idMapping);
   }
 
-  private DimensionSelector filterBlackList(DimensionSelector selector)
+  public static DimensionSelector filterDenyList(Set<String> values, 
DimensionSelector selector)
   {
-    final int selectorCardinality = selector.getValueCardinality();
-    if (selectorCardinality < 0 || !selector.nameLookupPossibleInAdvance()) {
+    if (selector.getValueCardinality() < 0 || 
!selector.nameLookupPossibleInAdvance()) {
       return new PredicateFilteredDimensionSelector(
           selector,
           input -> !values.contains(input)
       );
     }
-    final int maxPossibleFilteredCardinality = selectorCardinality;
-    int count = 0;
-    final Int2IntOpenHashMap forwardMapping = new 
Int2IntOpenHashMap(maxPossibleFilteredCardinality);
-    forwardMapping.defaultReturnValue(-1);
-    final int[] reverseMapping = new int[maxPossibleFilteredCardinality];
-    for (int i = 0; i < selectorCardinality; i++) {
-      if 
(!values.contains(NullHandling.nullToEmptyIfNeeded(selector.lookupName(i)))) {
-        forwardMapping.put(i, count);
-        reverseMapping[count++] = i;
-      }
-    }
-    return new ForwardingFilteredDimensionSelector(selector, forwardMapping, 
reverseMapping);
+    final IdMapping idMapping = buildDenyListIdMapping(
+        values,
+        selector.getValueCardinality(),
+        selector::lookupName
+    );
+    return new ForwardingFilteredDimensionSelector(selector, idMapping);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/dimension/PrefixFilteredDimensionSpec.java
 
b/processing/src/main/java/org/apache/druid/query/dimension/PrefixFilteredDimensionSpec.java
index 3cf8981..5e6467e 100644
--- 
a/processing/src/main/java/org/apache/druid/query/dimension/PrefixFilteredDimensionSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/query/dimension/PrefixFilteredDimensionSpec.java
@@ -21,15 +21,12 @@ package org.apache.druid.query.dimension;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import it.unimi.dsi.fastutil.ints.Int2IntMap;
-import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.filter.DimFilterUtils;
 import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.IdMapping;
 
-import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 
 /**
@@ -68,33 +65,22 @@ public class PrefixFilteredDimensionSpec extends 
BaseFilteredDimensionSpec
     if (selectorCardinality < 0 || !selector.nameLookupPossibleInAdvance()) {
       return new PredicateFilteredDimensionSelector(
           selector,
-          new Predicate<String>()
-          {
-            @Override
-            public boolean apply(@Nullable String input)
-            {
-              String val = NullHandling.nullToEmptyIfNeeded(input);
-              return val == null ? false : val.startsWith(prefix);
-            }
+          input -> {
+            String val = NullHandling.nullToEmptyIfNeeded(input);
+            return val != null && val.startsWith(prefix);
           }
       );
     }
 
-    int count = 0;
-    final Int2IntOpenHashMap forwardMapping = new Int2IntOpenHashMap();
-    forwardMapping.defaultReturnValue(-1);
+    final IdMapping.Builder builder = IdMapping.Builder.ofUnknownCardinality();
     for (int i = 0; i < selectorCardinality; i++) {
       String val = NullHandling.nullToEmptyIfNeeded(selector.lookupName(i));
       if (val != null && val.startsWith(prefix)) {
-        forwardMapping.put(i, count++);
+        builder.addForwardMapping(i);
       }
     }
 
-    final int[] reverseMapping = new int[forwardMapping.size()];
-    for (Int2IntMap.Entry e : forwardMapping.int2IntEntrySet()) {
-      reverseMapping[e.getIntValue()] = e.getIntKey();
-    }
-    return new ForwardingFilteredDimensionSelector(selector, forwardMapping, 
reverseMapping);
+    return new ForwardingFilteredDimensionSelector(selector, builder.build());
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/dimension/RegexFilteredDimensionSpec.java
 
b/processing/src/main/java/org/apache/druid/query/dimension/RegexFilteredDimensionSpec.java
index 684aae4..6767958 100644
--- 
a/processing/src/main/java/org/apache/druid/query/dimension/RegexFilteredDimensionSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/query/dimension/RegexFilteredDimensionSpec.java
@@ -21,15 +21,12 @@ package org.apache.druid.query.dimension;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import it.unimi.dsi.fastutil.ints.Int2IntMap;
-import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.filter.DimFilterUtils;
 import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.IdMapping;
 
-import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.regex.Pattern;
 
@@ -72,33 +69,22 @@ public class RegexFilteredDimensionSpec extends 
BaseFilteredDimensionSpec
     if (selectorCardinality < 0 || !selector.nameLookupPossibleInAdvance()) {
       return new PredicateFilteredDimensionSelector(
           selector,
-          new Predicate<String>()
-          {
-            @Override
-            public boolean apply(@Nullable String input)
-            {
-              String val = NullHandling.nullToEmptyIfNeeded(input);
-              return val == null ? false : 
compiledRegex.matcher(val).matches();
-            }
+          input -> {
+            String val = NullHandling.nullToEmptyIfNeeded(input);
+            return val != null && compiledRegex.matcher(val).matches();
           }
       );
     }
 
-    int count = 0;
-    final Int2IntOpenHashMap forwardMapping = new Int2IntOpenHashMap();
-    forwardMapping.defaultReturnValue(-1);
+    final IdMapping.Builder builder = IdMapping.Builder.ofUnknownCardinality();
     for (int i = 0; i < selectorCardinality; i++) {
       String val = NullHandling.nullToEmptyIfNeeded(selector.lookupName(i));
       if (val != null && compiledRegex.matcher(val).matches()) {
-        forwardMapping.put(i, count++);
+        builder.addForwardMapping(i);
       }
     }
 
-    final int[] reverseMapping = new int[forwardMapping.size()];
-    for (Int2IntMap.Entry e : forwardMapping.int2IntEntrySet()) {
-      reverseMapping[e.getIntValue()] = e.getIntKey();
-    }
-    return new ForwardingFilteredDimensionSelector(selector, forwardMapping, 
reverseMapping);
+    return new ForwardingFilteredDimensionSelector(selector, builder.build());
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/IdMapping.java 
b/processing/src/main/java/org/apache/druid/segment/IdMapping.java
new file mode 100644
index 0000000..d250d00
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/IdMapping.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import org.apache.druid.java.util.common.IAE;
+
+/**
+ * Map some set of dictionary id to a smaller set of dictionaryIds (or... the 
same if you are wasteful), providing
+ * a translation layer between them
+ */
+public class IdMapping
+{
+  private final Int2IntOpenHashMap forwardMapping;
+  private final int[] reverseMapping;
+
+  private IdMapping(Int2IntOpenHashMap forwardMapping, int[] reverseMapping)
+  {
+    this.forwardMapping = forwardMapping;
+    if (forwardMapping.defaultReturnValue() != -1) {
+      throw new IAE("forwardMapping.defaultReturnValue() should be -1");
+    }
+    this.reverseMapping = reverseMapping;
+  }
+
+  /**
+   * Get the substitute index, given the original
+   */
+  public int getForwardedId(int id)
+  {
+    return forwardMapping.get(id);
+  }
+
+  /**
+   * Get the original index, given the substitute
+   */
+  public int getReverseId(int id)
+  {
+    if (id < 0 || id >= reverseMapping.length) {
+      return -1;
+    }
+    return reverseMapping[id];
+  }
+
+  /**
+   * Get the number of values stored in {@link #forwardMapping}
+   */
+  public int getValueCardinality()
+  {
+    return forwardMapping.size();
+  }
+
+  public static class Builder
+  {
+    /**
+     * Create a {@link IdMapping} where the cardinality is known, to use with
+     * {@link #addMapping(int)} to simultaneously populate both {@link 
#forwardMapping} and {@link #reverseMapping}.
+     */
+    public static Builder ofCardinality(int cardinality)
+    {
+      final Int2IntOpenHashMap forwardMapping = new 
Int2IntOpenHashMap(cardinality);
+      forwardMapping.defaultReturnValue(-1);
+
+      final int[] reverseMapping = new int[cardinality];
+      return new Builder(forwardMapping, reverseMapping, true);
+    }
+
+    /**
+     * Create an {@link IdMapping} where the cardinality is not known up 
front. Since cardinality
+     * is unknown, {@link #reverseMapping} is initialized to an empty array, 
and {@link #addForwardMapping(int)} should
+     * be used to build the {@link #forwardMapping}, when {@link #build()} is 
called {@link #reverseMapping} will be
+     * computed to produce the completed {@link IdMapping}. This is less 
efficient than
+     * {@link #ofCardinality(int)}, so it should be preferred if the value 
cardinality is in fact known
+     */
+    public static Builder ofUnknownCardinality()
+    {
+      final Int2IntOpenHashMap forwardMapping = new Int2IntOpenHashMap();
+      forwardMapping.defaultReturnValue(-1);
+
+      return new Builder(forwardMapping, new int[0], false);
+    }
+
+    private final Int2IntOpenHashMap forwardMapping;
+    private final int[] reverseMapping;
+    private int count = 0;
+    private final boolean knownCardinality;
+
+    private Builder(
+        Int2IntOpenHashMap forwardMapping,
+        int[] reverseMapping,
+        boolean knownCardinality
+    )
+    {
+      this.forwardMapping = forwardMapping;
+      this.reverseMapping = reverseMapping;
+      this.knownCardinality = knownCardinality;
+    }
+
+    /**
+     * Adds a lookup for the 'forwarded' index to the current highest index 
value, and a reverse lookup from this
+     * substitute index back to the original. This method can only be called 
when cardinality is known, to ensure that
+     * {@link #reverseMapping} is initialized correctly.
+     */
+    public void addMapping(int forwardId)
+    {
+      if (!knownCardinality) {
+        throw new IAE("Cannot call addMapping on unknown cardinality, use 
addForwardMapping instead");
+      }
+      forwardMapping.put(forwardId, count);
+      reverseMapping[count++] = forwardId;
+    }
+
+    /**
+     * Adds a lookup for the 'forwarded' index to the current highest index 
value. This method can only be used when
+     * the cardinality is unknown, otherwise {@link #addMapping(int)} should 
be used instead since the
+     * {@link #reverseMapping} is only computed if {@link #knownCardinality} 
is false
+     */
+
+    public void addForwardMapping(int forwardId)
+    {
+      if (knownCardinality) {
+        throw new IAE("Cannot call addForwardMapping on known cardinality, use 
addMapping instead");
+      }
+      forwardMapping.put(forwardId, count++);
+    }
+
+    public IdMapping build()
+    {
+      if (knownCardinality) {
+        return new IdMapping(forwardMapping, reverseMapping);
+      } else {
+        final int[] reverseMapping = new int[forwardMapping.size()];
+        for (Int2IntMap.Entry e : forwardMapping.int2IntEntrySet()) {
+          reverseMapping[e.getIntValue()] = e.getIntKey();
+        }
+        return new IdMapping(forwardMapping, reverseMapping);
+      }
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/VirtualColumn.java 
b/processing/src/main/java/org/apache/druid/segment/VirtualColumn.java
index 3193ad5..c414600 100644
--- a/processing/src/main/java/org/apache/druid/segment/VirtualColumn.java
+++ b/processing/src/main/java/org/apache/druid/segment/VirtualColumn.java
@@ -299,6 +299,7 @@ public interface VirtualColumn extends Cacheable
    * @return BitmapIndex
    */
   @SuppressWarnings("unused")
+  @Nullable
   default BitmapIndex getBitmapIndex(String columnName, ColumnSelector 
selector)
   {
     throw new UnsupportedOperationException("not supported");
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java 
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
index 74e1d76..178dd57 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
@@ -355,26 +355,7 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     if (!allowReverseLookup) {
       throw new UnsupportedOperationException("Reverse lookup not allowed.");
     }
-
-    int minIndex = 0;
-    int maxIndex = size - 1;
-    while (minIndex <= maxIndex) {
-      int currIndex = (minIndex + maxIndex) >>> 1;
-
-      T currValue = indexed.get(currIndex);
-      int comparison = strategy.compare(currValue, value);
-      if (comparison == 0) {
-        return currIndex;
-      }
-
-      if (comparison < 0) {
-        minIndex = currIndex + 1;
-      } else {
-        maxIndex = currIndex - 1;
-      }
-    }
-
-    return -(minIndex + 1);
+    return Indexed.indexOf(indexed::get, size, strategy, value);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/Indexed.java 
b/processing/src/main/java/org/apache/druid/segment/data/Indexed.java
index f761d25..1c4499f 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/Indexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/Indexed.java
@@ -24,6 +24,7 @@ import 
org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop;
 import org.apache.druid.query.monomorphicprocessing.HotLoopCallee;
 
 import javax.annotation.Nullable;
+import java.util.Comparator;
 
 @PublicApi
 public interface Indexed<T> extends Iterable<T>, HotLoopCallee
@@ -45,4 +46,45 @@ public interface Indexed<T> extends Iterable<T>, 
HotLoopCallee
    * @return index of value, or a negative number
    */
   int indexOf(@Nullable T value);
+
+  /**
+   * Returns the index of "value" in some object whose values are accessible 
by index some {@link IndexedGetter}, or
+   * (-(insertion point) - 1) if the value is not present, in the manner of 
Arrays.binarySearch.
+   *
+   * This is used by {@link GenericIndexed} to strengthen the contract of 
{@link #indexOf(Object)}, which only
+   * guarantees that values-not-found will return some negative number.
+   *
+   * @param value value to search for
+   *
+   * @return index of value, or negative number equal to (-(insertion point) - 
1).
+   */
+  static <T> int indexOf(IndexedGetter<T> indexed, int size, Comparator<T> 
comparator, @Nullable T value)
+  {
+    int minIndex = 0;
+    int maxIndex = size - 1;
+    while (minIndex <= maxIndex) {
+      int currIndex = (minIndex + maxIndex) >>> 1;
+
+      T currValue = indexed.get(currIndex);
+      int comparison = comparator.compare(currValue, value);
+      if (comparison == 0) {
+        return currIndex;
+      }
+
+      if (comparison < 0) {
+        minIndex = currIndex + 1;
+      } else {
+        maxIndex = currIndex - 1;
+      }
+    }
+
+    return -(minIndex + 1);
+  }
+
+  @FunctionalInterface
+  interface IndexedGetter<T>
+  {
+    @Nullable
+    T get(int id);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/ListFilteredVirtualColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/virtual/ListFilteredVirtualColumn.java
new file mode 100644
index 0000000..72d04af
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/ListFilteredVirtualColumn.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.virtual;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.dimension.ListFilteredDimensionSpec;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.IdMapping;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.BitmapIndex;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * {@link VirtualColumn} form of {@link ListFilteredDimensionSpec}, powered by
+ * {@link org.apache.druid.query.dimension.ForwardingFilteredDimensionSelector}
+ */
+public class ListFilteredVirtualColumn implements VirtualColumn
+{
+  private final String name;
+  private final DimensionSpec delegate;
+  private final Set<String> values;
+  private final boolean allowList;
+
+  @JsonCreator
+  public ListFilteredVirtualColumn(
+      @JsonProperty("name") String name,
+      @JsonProperty("delegate") DimensionSpec delegate,
+      @JsonProperty("values") Set<String> values,
+      @JsonProperty("isAllowList") @Nullable Boolean isAllowList
+  )
+  {
+    this.name = Preconditions.checkNotNull(name, "name");
+    this.delegate = delegate;
+    this.values = values;
+    this.allowList = isAllowList == null ? true : isAllowList.booleanValue();
+  }
+
+
+  @JsonProperty("name")
+  @Override
+  public String getOutputName()
+  {
+    return name;
+  }
+
+  @JsonProperty
+  public Set<String> getValues()
+  {
+    return values;
+  }
+
+  @JsonProperty("isAllowList")
+  public boolean isAllowList()
+  {
+    return allowList;
+  }
+
+  @JsonProperty
+  public DimensionSpec getDelegate()
+  {
+    return delegate;
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    CacheKeyBuilder builder = new 
CacheKeyBuilder(VirtualColumnCacheHelper.CACHE_TYPE_ID_LIST_FILTERED)
+        .appendString(name)
+        .appendCacheable(delegate)
+        .appendStringsIgnoringOrder(values)
+        .appendBoolean(allowList);
+    return builder.build();
+  }
+
+
+  @Override
+  public DimensionSelector makeDimensionSelector(
+      DimensionSpec dimensionSpec,
+      ColumnSelectorFactory factory
+  )
+  {
+    if (allowList) {
+      return ListFilteredDimensionSpec.filterAllowList(values, 
factory.makeDimensionSelector(delegate));
+    } else {
+      return ListFilteredDimensionSpec.filterDenyList(values, 
factory.makeDimensionSelector(delegate));
+    }
+  }
+
+  @Override
+  public ColumnValueSelector<?> makeColumnValueSelector(
+      String columnName,
+      ColumnSelectorFactory factory
+  )
+  {
+    return makeDimensionSelector(DefaultDimensionSpec.of(columnName), factory);
+  }
+
+  @Override
+  public ColumnCapabilities capabilities(String columnName)
+  {
+    return new ColumnCapabilitiesImpl().setType(delegate.getOutputType())
+                                       .setDictionaryEncoded(true)
+                                       .setHasBitmapIndexes(true);
+  }
+
+  @Override
+  public ColumnCapabilities capabilities(ColumnInspector inspector, String 
columnName)
+  {
+    return inspector.getColumnCapabilities(delegate.getDimension());
+  }
+
+  @Override
+  public List<String> requiredColumns()
+  {
+    return Collections.singletonList(delegate.getDimension());
+  }
+
+  @Override
+  public boolean usesDotNotation()
+  {
+    return false;
+  }
+
+  @Override
+  public @Nullable BitmapIndex getBitmapIndex(
+      String columnName,
+      ColumnSelector selector
+  )
+  {
+    final ColumnHolder holder = 
selector.getColumnHolder(delegate.getDimension());
+    if (holder == null) {
+      return null;
+    }
+    final BitmapIndex underlyingIndex = holder.getBitmapIndex();
+    if (underlyingIndex == null) {
+      return null;
+    }
+    final IdMapping idMapping;
+    if (allowList) {
+      idMapping = ListFilteredDimensionSpec.buildAllowListIdMapping(
+          values,
+          underlyingIndex.getCardinality(),
+          null,
+          underlyingIndex::getValue
+      );
+    } else {
+      idMapping = ListFilteredDimensionSpec.buildDenyListIdMapping(
+          values,
+          underlyingIndex.getCardinality(),
+          underlyingIndex::getValue
+      );
+    }
+
+    return new ListFilteredBitmapIndex(underlyingIndex, idMapping);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ListFilteredVirtualColumn that = (ListFilteredVirtualColumn) o;
+    return allowList == that.allowList && name.equals(that.name) && 
delegate.equals(that.delegate) && values.equals(
+        that.values);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(name, delegate, values, allowList);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ListFilteredVirtualColumn{" +
+           "name='" + name + '\'' +
+           ", delegate=" + delegate +
+           ", values=" + values +
+           ", isAllowList=" + allowList +
+           '}';
+  }
+
+  private static class ListFilteredBitmapIndex implements BitmapIndex
+  {
+    final BitmapIndex delegate;
+    final IdMapping idMapping;
+
+    private ListFilteredBitmapIndex(BitmapIndex delegate, IdMapping idMapping)
+    {
+      this.delegate = delegate;
+      this.idMapping = idMapping;
+    }
+
+    @Override
+    public String getValue(int index)
+    {
+      return delegate.getValue(idMapping.getReverseId(index));
+    }
+
+    @Override
+    public boolean hasNulls()
+    {
+      return delegate.hasNulls();
+    }
+
+    @Override
+    public BitmapFactory getBitmapFactory()
+    {
+      return delegate.getBitmapFactory();
+    }
+
+    @Override
+    public ImmutableBitmap getBitmap(int idx)
+    {
+      return delegate.getBitmap(idMapping.getReverseId(idx));
+    }
+    @Override
+    public int getCardinality()
+    {
+      return idMapping.getValueCardinality();
+    }
+
+    @Override
+    public int getIndex(@Nullable String value)
+    {
+      return Indexed.indexOf(this::getValue, getCardinality(), 
Comparators.naturalNullsFirst(), value);
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/VirtualColumnCacheHelper.java
 
b/processing/src/main/java/org/apache/druid/segment/virtual/VirtualColumnCacheHelper.java
index a116c0b..d5e4462 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/virtual/VirtualColumnCacheHelper.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/VirtualColumnCacheHelper.java
@@ -23,6 +23,7 @@ public class VirtualColumnCacheHelper
 {
   public static final byte CACHE_TYPE_ID_MAP = 0x00;
   public static final byte CACHE_TYPE_ID_EXPRESSION = 0x01;
+  public static final byte CACHE_TYPE_ID_LIST_FILTERED = 0x02;
 
   // Starting byte 0xFF is reserved for site-specific virtual columns.
   @SuppressWarnings("unused")
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IdMappingTest.java 
b/processing/src/test/java/org/apache/druid/segment/IdMappingTest.java
new file mode 100644
index 0000000..b1f9004
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/IdMappingTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.IAE;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class IdMappingTest
+{
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testMappingKnownCardinality()
+  {
+    final int cardinality = 10;
+    IdMapping.Builder builder = IdMapping.Builder.ofCardinality(cardinality);
+    for (int i = 0; i < cardinality; i++) {
+      builder.addMapping(i * 10);
+    }
+    IdMapping mapping = builder.build();
+    for (int i = 0; i < cardinality; i++) {
+      Assert.assertEquals(i * 10, mapping.getReverseId(i));
+      Assert.assertEquals(i, mapping.getForwardedId(i * 10));
+    }
+
+    Assert.assertEquals(-1, mapping.getForwardedId(1));
+    Assert.assertEquals(-1, mapping.getForwardedId(-1));
+    Assert.assertEquals(-1, mapping.getReverseId(-1));
+    Assert.assertEquals(-1, mapping.getReverseId(10));
+  }
+
+  @Test
+  public void testMappingUnknownCardinality()
+  {
+    final int cardinality = 10;
+    IdMapping.Builder builder = IdMapping.Builder.ofUnknownCardinality();
+    for (int i = 0; i < cardinality; i++) {
+      builder.addForwardMapping(i * 10);
+    }
+    IdMapping mapping = builder.build();
+    for (int i = 0; i < cardinality; i++) {
+      Assert.assertEquals(i * 10, mapping.getReverseId(i));
+      Assert.assertEquals(i, mapping.getForwardedId(i * 10));
+    }
+
+    Assert.assertEquals(-1, mapping.getForwardedId(1));
+    Assert.assertEquals(-1, mapping.getForwardedId(-1));
+    Assert.assertEquals(-1, mapping.getReverseId(-1));
+    Assert.assertEquals(-1, mapping.getReverseId(10));
+  }
+
+  @Test
+  public void testMappingCardinalityUnknownKnown()
+  {
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage("addForwardMapping instead");
+    final int cardinality = 10;
+    IdMapping.Builder builder = IdMapping.Builder.ofUnknownCardinality();
+    for (int i = 0; i < cardinality; i++) {
+      builder.addMapping(i * 10);
+    }
+  }
+
+  @Test
+  public void testMappingCardinalityKnownUnknown()
+  {
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage("addMapping instead");
+    final int cardinality = 10;
+    IdMapping.Builder builder = IdMapping.Builder.ofCardinality(cardinality);
+    for (int i = 0; i < cardinality; i++) {
+      builder.addForwardMapping(i * 10);
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/DummyStringVirtualColumn.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/DummyStringVirtualColumn.java
index b532f78..b413424 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/virtual/DummyStringVirtualColumn.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/DummyStringVirtualColumn.java
@@ -166,7 +166,7 @@ public class DummyStringVirtualColumn implements 
VirtualColumn
   }
 
   @Override
-  public BitmapIndex getBitmapIndex(String columnName, ColumnSelector 
columnSelector)
+  public @Nullable BitmapIndex getBitmapIndex(String columnName, 
ColumnSelector columnSelector)
   {
     if (enableBitmaps) {
       ColumnHolder holder = columnSelector.getColumnHolder(baseColumnName);
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/ListFilteredVirtualColumnSelectorTest.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/ListFilteredVirtualColumnSelectorTest.java
new file mode 100644
index 0000000..cc7f5e4
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/ListFilteredVirtualColumnSelectorTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.virtual;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.segment.ColumnSelector;
+import org.apache.druid.segment.ColumnSelectorBitmapIndexSelector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.RowAdapters;
+import org.apache.druid.segment.RowBasedColumnSelectorFactory;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.BitmapIndex;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.SelectorFilter;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class ListFilteredVirtualColumnSelectorTest extends 
InitializedNullHandlingTest
+{
+  private static final String COLUMN_NAME = "x";
+  private static final String NON_EXISTENT_COLUMN_NAME = "nope";
+  private static final String ALLOW_VIRTUAL_NAME = "allowed";
+  private static final String DENY_VIRTUAL_NAME = "no-stairway";
+  private final RowSignature rowSignature = RowSignature.builder()
+                                                        .addTimeColumn()
+                                                        
.addDimensions(ImmutableList.of(DefaultDimensionSpec.of(COLUMN_NAME)))
+                                                        .build();
+
+
+  @Test
+  public void testListFilteredVirtualColumnNilDimensionSelector()
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        ALLOW_VIRTUAL_NAME,
+        new DefaultDimensionSpec(NON_EXISTENT_COLUMN_NAME, 
NON_EXISTENT_COLUMN_NAME, ValueType.STRING),
+        ImmutableSet.of("a", "b"),
+        true
+    );
+
+    VirtualizedColumnSelectorFactory selectorFactory = 
makeSelectorFactory(virtualColumn);
+    DimensionSelector selector = 
selectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(ALLOW_VIRTUAL_NAME));
+    Assert.assertNull(selector.getObject());
+  }
+
+  @Test
+  public void testListFilteredVirtualColumnNilColumnValueSelector()
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        ALLOW_VIRTUAL_NAME,
+        new DefaultDimensionSpec(NON_EXISTENT_COLUMN_NAME, 
NON_EXISTENT_COLUMN_NAME, ValueType.STRING),
+        ImmutableSet.of("a", "b"),
+        true
+    );
+
+    VirtualizedColumnSelectorFactory selectorFactory = 
makeSelectorFactory(virtualColumn);
+    ColumnValueSelector<?> selector = 
selectorFactory.makeColumnValueSelector(ALLOW_VIRTUAL_NAME);
+    Assert.assertNull(selector.getObject());
+  }
+
+
+  @Test
+  public void testListFilteredVirtualColumnAllowListDimensionSelector()
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        ALLOW_VIRTUAL_NAME,
+        new DefaultDimensionSpec(COLUMN_NAME, COLUMN_NAME, ValueType.STRING),
+        ImmutableSet.of("a", "b"),
+        true
+    );
+
+    VirtualizedColumnSelectorFactory selectorFactory = 
makeSelectorFactory(virtualColumn);
+    DimensionSelector selector = 
selectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(ALLOW_VIRTUAL_NAME));
+    Assert.assertEquals(ImmutableList.of("a", "b"), selector.getObject());
+    assertCapabilities(selectorFactory, ALLOW_VIRTUAL_NAME);
+  }
+
+  @Test
+  public void testListFilteredVirtualColumnAllowListColumnValueSelector()
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        ALLOW_VIRTUAL_NAME,
+        new DefaultDimensionSpec(COLUMN_NAME, COLUMN_NAME, ValueType.STRING),
+        ImmutableSet.of("a", "b"),
+        true
+    );
+
+    VirtualizedColumnSelectorFactory selectorFactory = 
makeSelectorFactory(virtualColumn);
+    ColumnValueSelector<?> selector = 
selectorFactory.makeColumnValueSelector(ALLOW_VIRTUAL_NAME);
+    Assert.assertEquals(ImmutableList.of("a", "b"), selector.getObject());
+    assertCapabilities(selectorFactory, ALLOW_VIRTUAL_NAME);
+  }
+
+  @Test
+  public void testListFilteredVirtualColumnDenyListDimensionSelector()
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        DENY_VIRTUAL_NAME,
+        new DefaultDimensionSpec(COLUMN_NAME, COLUMN_NAME, ValueType.STRING),
+        ImmutableSet.of("a", "b"),
+        false
+    );
+
+    VirtualizedColumnSelectorFactory selectorFactory = 
makeSelectorFactory(virtualColumn);
+    DimensionSelector selector = 
selectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(DENY_VIRTUAL_NAME));
+    Assert.assertEquals(ImmutableList.of("c", "d"), selector.getObject());
+    assertCapabilities(selectorFactory, DENY_VIRTUAL_NAME);
+  }
+
+  @Test
+  public void testListFilteredVirtualColumnDenyListColumnValueSelector()
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        DENY_VIRTUAL_NAME,
+        new DefaultDimensionSpec(COLUMN_NAME, COLUMN_NAME, ValueType.STRING),
+        ImmutableSet.of("a", "b"),
+        false
+    );
+
+    VirtualizedColumnSelectorFactory selectorFactory = 
makeSelectorFactory(virtualColumn);
+    ColumnValueSelector<?> selector = 
selectorFactory.makeColumnValueSelector(DENY_VIRTUAL_NAME);
+    Assert.assertEquals(ImmutableList.of("c", "d"), selector.getObject());
+    assertCapabilities(selectorFactory, DENY_VIRTUAL_NAME);
+  }
+
+  @Test
+  public void testFilterListFilteredVirtualColumnAllowIndex()
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        ALLOW_VIRTUAL_NAME,
+        new DefaultDimensionSpec(COLUMN_NAME, COLUMN_NAME, ValueType.STRING),
+        ImmutableSet.of("b", "c"),
+        true
+    );
+
+    ColumnSelector selector = EasyMock.createMock(ColumnSelector.class);
+    ColumnHolder holder = EasyMock.createMock(ColumnHolder.class);
+    BitmapIndex index = EasyMock.createMock(BitmapIndex.class);
+    ImmutableBitmap bitmap = EasyMock.createMock(ImmutableBitmap.class);
+    BitmapFactory bitmapFactory = EasyMock.createMock(BitmapFactory.class);
+
+    
EasyMock.expect(selector.getColumnHolder(COLUMN_NAME)).andReturn(holder).atLeastOnce();
+
+    EasyMock.expect(holder.getBitmapIndex()).andReturn(index).atLeastOnce();
+
+    EasyMock.expect(index.getCardinality()).andReturn(3).atLeastOnce();
+    EasyMock.expect(index.getValue(0)).andReturn("a").atLeastOnce();
+    EasyMock.expect(index.getValue(1)).andReturn("b").atLeastOnce();
+    EasyMock.expect(index.getValue(2)).andReturn("c").atLeastOnce();
+
+    EasyMock.expect(index.getBitmap(2)).andReturn(bitmap).once();
+    EasyMock.expect(index.getBitmapFactory()).andReturn(bitmapFactory).once();
+    EasyMock.expect(index.hasNulls()).andReturn(true).once();
+
+    EasyMock.replay(selector, holder, index, bitmap, bitmapFactory);
+
+    ColumnSelectorBitmapIndexSelector bitmapIndexSelector = new 
ColumnSelectorBitmapIndexSelector(
+        new RoaringBitmapFactory(),
+        VirtualColumns.create(Collections.singletonList(virtualColumn)),
+        selector
+    );
+
+    SelectorFilter filter = new SelectorFilter(ALLOW_VIRTUAL_NAME, "a");
+    Assert.assertTrue(filter.shouldUseBitmapIndex(bitmapIndexSelector));
+
+    BitmapIndex listFilteredIndex = 
bitmapIndexSelector.getBitmapIndex(ALLOW_VIRTUAL_NAME);
+    Assert.assertEquals(-1, listFilteredIndex.getIndex("a"));
+    Assert.assertEquals(0, listFilteredIndex.getIndex("b"));
+    Assert.assertEquals(1, listFilteredIndex.getIndex("c"));
+    Assert.assertEquals(2, listFilteredIndex.getCardinality());
+    Assert.assertEquals("b", listFilteredIndex.getValue(0));
+    Assert.assertEquals("c", listFilteredIndex.getValue(1));
+    Assert.assertEquals(bitmap, listFilteredIndex.getBitmap(1));
+    Assert.assertEquals(bitmapFactory, listFilteredIndex.getBitmapFactory());
+    Assert.assertTrue(listFilteredIndex.hasNulls());
+
+    EasyMock.verify(selector, holder, index, bitmap, bitmapFactory);
+  }
+
+  @Test
+  public void testFilterListFilteredVirtualColumnDenyIndex()
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        DENY_VIRTUAL_NAME,
+        new DefaultDimensionSpec(COLUMN_NAME, COLUMN_NAME, ValueType.STRING),
+        ImmutableSet.of("a", "b"),
+        false
+    );
+
+
+    ColumnSelector selector = EasyMock.createMock(ColumnSelector.class);
+    ColumnHolder holder = EasyMock.createMock(ColumnHolder.class);
+    BitmapIndex index = EasyMock.createMock(BitmapIndex.class);
+    ImmutableBitmap bitmap = EasyMock.createMock(ImmutableBitmap.class);
+    BitmapFactory bitmapFactory = EasyMock.createMock(BitmapFactory.class);
+
+    
EasyMock.expect(selector.getColumnHolder(COLUMN_NAME)).andReturn(holder).atLeastOnce();
+
+    EasyMock.expect(holder.getBitmapIndex()).andReturn(index).atLeastOnce();
+
+    EasyMock.expect(index.getCardinality()).andReturn(3).atLeastOnce();
+    EasyMock.expect(index.getValue(0)).andReturn("a").atLeastOnce();
+    EasyMock.expect(index.getValue(1)).andReturn("b").atLeastOnce();
+    EasyMock.expect(index.getValue(2)).andReturn("c").atLeastOnce();
+
+    EasyMock.expect(index.getBitmap(0)).andReturn(bitmap).once();
+    EasyMock.expect(index.getBitmapFactory()).andReturn(bitmapFactory).once();
+    EasyMock.expect(index.hasNulls()).andReturn(true).once();
+
+    EasyMock.replay(selector, holder, index, bitmap, bitmapFactory);
+
+    ColumnSelectorBitmapIndexSelector bitmapIndexSelector = new 
ColumnSelectorBitmapIndexSelector(
+        new RoaringBitmapFactory(),
+        VirtualColumns.create(Collections.singletonList(virtualColumn)),
+        selector
+    );
+
+    SelectorFilter filter = new SelectorFilter(DENY_VIRTUAL_NAME, "c");
+    Assert.assertTrue(filter.shouldUseBitmapIndex(bitmapIndexSelector));
+
+    BitmapIndex listFilteredIndex = 
bitmapIndexSelector.getBitmapIndex(DENY_VIRTUAL_NAME);
+    Assert.assertEquals(-1, listFilteredIndex.getIndex("a"));
+    Assert.assertEquals(-1, listFilteredIndex.getIndex("b"));
+    Assert.assertEquals(0, listFilteredIndex.getIndex("c"));
+    Assert.assertEquals(1, listFilteredIndex.getCardinality());
+    Assert.assertEquals(bitmap, listFilteredIndex.getBitmap(1));
+    Assert.assertEquals(bitmapFactory, listFilteredIndex.getBitmapFactory());
+    Assert.assertTrue(listFilteredIndex.hasNulls());
+
+    EasyMock.verify(selector, holder, index, bitmap, bitmapFactory);
+  }
+
+  private void assertCapabilities(VirtualizedColumnSelectorFactory 
selectorFactory, String columnName)
+  {
+    ColumnCapabilities capabilities = 
selectorFactory.getColumnCapabilities(columnName);
+    Assert.assertNotNull(capabilities);
+    Assert.assertEquals(ValueType.STRING, capabilities.getType());
+    Assert.assertTrue(capabilities.hasMultipleValues().isMaybeTrue());
+  }
+
+  private VirtualizedColumnSelectorFactory 
makeSelectorFactory(ListFilteredVirtualColumn virtualColumn)
+  {
+    VirtualizedColumnSelectorFactory selectorFactory = new 
VirtualizedColumnSelectorFactory(
+        RowBasedColumnSelectorFactory.create(
+            RowAdapters.standardRow(),
+            () -> new MapBasedRow(0L, ImmutableMap.of(COLUMN_NAME, 
ImmutableList.of("a", "b", "c", "d"))),
+            rowSignature,
+            false
+        ),
+        VirtualColumns.create(ImmutableList.of(virtualColumn))
+    );
+
+    return selectorFactory;
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/ListFilteredVirtualColumnTest.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/ListFilteredVirtualColumnTest.java
new file mode 100644
index 0000000..7238ac2
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/ListFilteredVirtualColumnTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.virtual;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ValueType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ListFilteredVirtualColumnTest
+{
+  private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
+
+  @Test
+  public void testSerdeAllowList() throws JsonProcessingException
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        "hello",
+        new DefaultDimensionSpec("column", "output", ValueType.STRING),
+        ImmutableSet.of("foo", "bar"),
+        true
+    );
+    ListFilteredVirtualColumn roundTrip = 
MAPPER.readValue(MAPPER.writeValueAsString(virtualColumn), 
ListFilteredVirtualColumn.class);
+    Assert.assertEquals(virtualColumn, roundTrip);
+    Assert.assertArrayEquals(virtualColumn.getCacheKey(), 
roundTrip.getCacheKey());
+  }
+
+  @Test
+  public void testSerdeDenyList() throws JsonProcessingException
+  {
+    ListFilteredVirtualColumn virtualColumn = new ListFilteredVirtualColumn(
+        "hello",
+        new DefaultDimensionSpec("column", "output", ValueType.STRING),
+        ImmutableSet.of("foo", "bar"),
+        false
+    );
+    ListFilteredVirtualColumn roundTrip = 
MAPPER.readValue(MAPPER.writeValueAsString(virtualColumn), 
ListFilteredVirtualColumn.class);
+    Assert.assertEquals(virtualColumn, roundTrip);
+    Assert.assertArrayEquals(virtualColumn.getCacheKey(), 
roundTrip.getCacheKey());
+  }
+
+  @Test
+  public void testEqualsAndHashcode()
+  {
+    
EqualsVerifier.forClass(ListFilteredVirtualColumn.class).usingGetClass().verify();
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/DruidExpression.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/DruidExpression.java
index 6e0e80c..fc51ecd 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/DruidExpression.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/DruidExpression.java
@@ -23,12 +23,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.io.BaseEncoding;
 import com.google.common.primitives.Chars;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.nary.TrinaryFn;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.math.expr.Parser;
+import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 
+import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
@@ -49,33 +52,47 @@ public class DruidExpression
     Arrays.sort(SAFE_CHARS);
   }
 
+  @Nullable
   private final SimpleExtraction simpleExtraction;
   private final String expression;
+  private final TrinaryFn<String, ValueType, ExprMacroTable, VirtualColumn> 
virtualColumnFn;
 
-  private DruidExpression(final SimpleExtraction simpleExtraction, final 
String expression)
+  private DruidExpression(@Nullable final SimpleExtraction simpleExtraction, 
final String expression, @Nullable final TrinaryFn<String, ValueType, 
ExprMacroTable, VirtualColumn> virtualColumnFn)
   {
     this.simpleExtraction = simpleExtraction;
     this.expression = Preconditions.checkNotNull(expression);
+    this.virtualColumnFn = virtualColumnFn != null
+                           ? virtualColumnFn
+                           : (name, outputType, macroTable) ->
+                               new ExpressionVirtualColumn(name, expression, 
outputType, macroTable);
   }
 
   public static DruidExpression of(final SimpleExtraction simpleExtraction, 
final String expression)
   {
-    return new DruidExpression(simpleExtraction, expression);
+    return new DruidExpression(simpleExtraction, expression, null);
   }
 
   public static DruidExpression fromColumn(final String column)
   {
-    return new DruidExpression(SimpleExtraction.of(column, null), 
StringUtils.format("\"%s\"", escape(column)));
+    return new DruidExpression(SimpleExtraction.of(column, null), 
StringUtils.format("\"%s\"", escape(column)), null);
   }
 
   public static DruidExpression fromExpression(final String expression)
   {
-    return new DruidExpression(null, expression);
+    return new DruidExpression(null, expression, null);
   }
 
   public static DruidExpression fromFunctionCall(final String functionName, 
final List<DruidExpression> args)
   {
-    return new DruidExpression(null, functionCall(functionName, args));
+    return new DruidExpression(null, functionCall(functionName, args), null);
+  }
+
+  public static DruidExpression forVirtualColumn(
+      final String expression,
+      final TrinaryFn<String, ValueType, ExprMacroTable, VirtualColumn> 
virtualColumnFunction
+  )
+  {
+    return new DruidExpression(null, expression, virtualColumnFunction);
   }
 
   public static String numberLiteral(final Number n)
@@ -163,13 +180,13 @@ public class DruidExpression
     return Preconditions.checkNotNull(simpleExtraction);
   }
 
-  public ExpressionVirtualColumn toVirtualColumn(
+  public VirtualColumn toVirtualColumn(
       final String name,
       final ValueType outputType,
       final ExprMacroTable macroTable
   )
   {
-    return new ExpressionVirtualColumn(name, expression, outputType, 
macroTable);
+    return virtualColumnFn.apply(name, outputType, macroTable);
   }
 
   public DruidExpression map(
@@ -179,7 +196,8 @@ public class DruidExpression
   {
     return new DruidExpression(
         simpleExtraction == null ? null : 
extractionMap.apply(simpleExtraction),
-        expressionMap.apply(expression)
+        expressionMap.apply(expression),
+        null
     );
   }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/MultiValueStringOperatorConversions.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/MultiValueStringOperatorConversions.java
index cb6dcfd..6b5e5fb 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/MultiValueStringOperatorConversions.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/MultiValueStringOperatorConversions.java
@@ -19,6 +19,9 @@
 
 package org.apache.druid.sql.calcite.expression.builtin;
 
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlOperator;
@@ -26,8 +29,20 @@ import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.Parser;
+import org.apache.druid.query.expression.ExprUtils;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.virtual.ListFilteredVirtualColumn;
 import org.apache.druid.sql.calcite.expression.AliasedOperatorConversion;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.expression.OperatorConversions;
+import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+
+import javax.annotation.Nullable;
+import java.util.List;
 
 /**
  * Array functions which return an array, but are used in a multi-valued 
string dimension context instead will output
@@ -290,6 +305,126 @@ public class MultiValueStringOperatorConversions
     }
   }
 
+  private abstract static class ListFilter implements SqlOperatorConversion
+  {
+    abstract boolean isAllowList();
+
+    @Nullable
+    @Override
+    public DruidExpression toDruidExpression(
+        PlannerContext plannerContext,
+        RowSignature rowSignature,
+        RexNode rexNode
+    )
+    {
+      final RexCall call = (RexCall) rexNode;
+
+      final List<DruidExpression> druidExpressions = 
Expressions.toDruidExpressions(
+          plannerContext,
+          rowSignature,
+          call.getOperands()
+      );
+
+      if (druidExpressions == null || druidExpressions.size() != 2) {
+        return null;
+      }
+
+      Expr expr = Parser.parse(druidExpressions.get(1).getExpression(), 
plannerContext.getExprMacroTable());
+      // the right expression must be a literal array for this to work, since 
we need the values of the column
+      if (!expr.isLiteral()) {
+        return null;
+      }
+      String[] lit = expr.eval(ExprUtils.nilBindings()).asStringArray();
+      if (lit == null || lit.length == 0) {
+        return null;
+      }
+
+      final StringBuilder builder;
+      if (isAllowList()) {
+        builder = new StringBuilder("filter((x) -> array_contains(");
+      } else {
+        builder = new StringBuilder("filter((x) -> !array_contains(");
+      }
+
+      builder.append(druidExpressions.get(1).getExpression())
+             .append(", x), ")
+             .append(druidExpressions.get(0).getExpression())
+             .append(")");
+
+      if (druidExpressions.get(0).isSimpleExtraction()) {
+        return DruidExpression.forVirtualColumn(
+            builder.toString(),
+            (name, outputType, macroTable) -> new ListFilteredVirtualColumn(
+                name,
+                
druidExpressions.get(0).getSimpleExtraction().toDimensionSpec(druidExpressions.get(0).getDirectColumn(),
 outputType),
+                ImmutableSet.copyOf(lit),
+                isAllowList()
+            )
+        );
+      }
+
+      return DruidExpression.fromExpression(builder.toString());
+    }
+  }
+
+  public static class FilterOnly extends ListFilter
+  {
+    private static final SqlFunction SQL_FUNCTION = OperatorConversions
+        .operatorBuilder("MV_FILTER_ONLY")
+        .operandTypeChecker(
+            OperandTypes.sequence(
+                "(string,array)",
+                OperandTypes.family(SqlTypeFamily.STRING),
+                OperandTypes.family(SqlTypeFamily.ARRAY)
+            )
+        )
+        .functionCategory(SqlFunctionCategory.STRING)
+        .returnTypeCascadeNullable(SqlTypeName.VARCHAR)
+        .build();
+
+    @Override
+    public SqlOperator calciteOperator()
+    {
+      return SQL_FUNCTION;
+    }
+
+
+    @Override
+    boolean isAllowList()
+    {
+      return true;
+    }
+  }
+
+  public static class FilterNone extends ListFilter
+  {
+    private static final SqlFunction SQL_FUNCTION = OperatorConversions
+        .operatorBuilder("MV_FILTER_NONE")
+        .operandTypeChecker(
+            OperandTypes.sequence(
+                "(string,array)",
+                OperandTypes.family(SqlTypeFamily.STRING),
+                OperandTypes.family(SqlTypeFamily.ARRAY)
+            )
+        )
+        .functionCategory(SqlFunctionCategory.STRING)
+        .returnTypeCascadeNullable(SqlTypeName.VARCHAR)
+        .build();
+
+    @Override
+    public SqlOperator calciteOperator()
+    {
+      return SQL_FUNCTION;
+    }
+
+
+    @Override
+    boolean isAllowList()
+    {
+      return false;
+    }
+  }
+
   private MultiValueStringOperatorConversions()
   {
     // no instantiation
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
index 0f52c34..f5d9edc 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
@@ -238,6 +238,8 @@ public class DruidOperatorTable implements SqlOperatorTable
                    .add(new MultiValueStringOperatorConversions.Slice())
                    .add(new 
MultiValueStringOperatorConversions.MultiStringToString())
                    .add(new 
MultiValueStringOperatorConversions.StringToMultiString())
+                   .add(new MultiValueStringOperatorConversions.FilterOnly())
+                   .add(new MultiValueStringOperatorConversions.FilterNone())
                    .build();
 
   private static final List<SqlOperatorConversion> 
REDUCTION_OPERATOR_CONVERSIONS =
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteMultiValueStringQueryTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteMultiValueStringQueryTest.java
index 6225468..f2359c4 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteMultiValueStringQueryTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteMultiValueStringQueryTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.sql.calcite;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import junitparams.JUnitParamsRunner;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -28,6 +29,7 @@ import 
org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.filter.AndDimFilter;
 import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.query.filter.LikeDimFilter;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
@@ -35,6 +37,7 @@ import 
org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.virtual.ListFilteredVirtualColumn;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.junit.Test;
@@ -933,4 +936,327 @@ public class CalciteMultiValueStringQueryTest extends 
BaseCalciteQueryTest
         results
     );
   }
+
+
+  @Test
+  public void testMultiValueListFilter() throws Exception
+  {
+    // Cannot vectorize due to usage of expressions.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT MV_FILTER_ONLY(dim3, ARRAY['b']), SUM(cnt) FROM druid.numfoo 
GROUP BY 1 ORDER BY 2 DESC",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(CalciteTests.DATASOURCE3)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ListFilteredVirtualColumn(
+                                "v0",
+                                DefaultDimensionSpec.of("dim3"),
+                                ImmutableSet.of("b"),
+                                true
+                            )
+                        )
+                        .setDimensions(
+                            dimensions(
+                                new DefaultDimensionSpec("v0", "_d0", 
ValueType.STRING)
+                            )
+                        )
+                        .setAggregatorSpecs(aggregators(new 
LongSumAggregatorFactory("a0", "cnt")))
+                        .setLimitSpec(new DefaultLimitSpec(
+                            ImmutableList.of(new OrderByColumnSpec(
+                                "a0",
+                                OrderByColumnSpec.Direction.DESCENDING,
+                                StringComparators.NUMERIC
+                            )),
+                            Integer.MAX_VALUE
+                        ))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{NullHandling.defaultStringValue(), 4L},
+            new Object[]{"b", 2L}
+        )
+    );
+  }
+
+  @Test
+  public void testMultiValueListFilterDeny() throws Exception
+  {
+    // Cannot vectorize due to usage of expressions.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT MV_FILTER_NONE(dim3, ARRAY['b']), SUM(cnt) FROM druid.numfoo 
GROUP BY 1 ORDER BY 2 DESC",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(CalciteTests.DATASOURCE3)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ListFilteredVirtualColumn(
+                                "v0",
+                                DefaultDimensionSpec.of("dim3"),
+                                ImmutableSet.of("b"),
+                                false
+                            )
+                        )
+                        .setDimensions(
+                            dimensions(
+                                new DefaultDimensionSpec("v0", "_d0", 
ValueType.STRING)
+                            )
+                        )
+                        .setAggregatorSpecs(aggregators(new 
LongSumAggregatorFactory("a0", "cnt")))
+                        .setLimitSpec(new DefaultLimitSpec(
+                            ImmutableList.of(new OrderByColumnSpec(
+                                "a0",
+                                OrderByColumnSpec.Direction.DESCENDING,
+                                StringComparators.NUMERIC
+                            )),
+                            Integer.MAX_VALUE
+                        ))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        useDefault ?
+        ImmutableList.of(
+            new Object[]{NullHandling.defaultStringValue(), 3L},
+            new Object[]{"a", 1L},
+            new Object[]{"c", 1L},
+            new Object[]{"d", 1L}
+        ) :
+        ImmutableList.of(
+            new Object[]{NullHandling.defaultStringValue(), 2L},
+            new Object[]{"", 1L},
+            new Object[]{"a", 1L},
+            new Object[]{"c", 1L},
+            new Object[]{"d", 1L}
+        )
+    );
+  }
+
+  @Test
+  public void testMultiValueListFilterComposed() throws Exception
+  {
+    // Cannot vectorize due to usage of expressions.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT MV_LENGTH(MV_FILTER_ONLY(dim3, ARRAY['b'])), SUM(cnt) FROM 
druid.numfoo GROUP BY 1 ORDER BY 2 DESC",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(CalciteTests.DATASOURCE3)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            expressionVirtualColumn(
+                                "v0",
+                                "array_length(filter((x) -> 
array_contains(array('b'), x), \"dim3\"))",
+                                ValueType.LONG
+                            )
+                        )
+                        .setDimensions(
+                            dimensions(
+                                new DefaultDimensionSpec("v0", "_d0", 
ValueType.LONG)
+                            )
+                        )
+                        .setAggregatorSpecs(aggregators(new 
LongSumAggregatorFactory("a0", "cnt")))
+                        .setLimitSpec(new DefaultLimitSpec(
+                            ImmutableList.of(new OrderByColumnSpec(
+                                "a0",
+                                OrderByColumnSpec.Direction.DESCENDING,
+                                StringComparators.NUMERIC
+                            )),
+                            Integer.MAX_VALUE
+                        ))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{0, 4L},
+            new Object[]{1, 2L}
+        )
+    );
+  }
+
+  @Test
+  public void testMultiValueListFilterComposedDeny() throws Exception
+  {
+    // Cannot vectorize due to usage of expressions.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT MV_LENGTH(MV_FILTER_NONE(dim3, ARRAY['b'])), SUM(cnt) FROM 
druid.numfoo GROUP BY 1 ORDER BY 2 DESC",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(CalciteTests.DATASOURCE3)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            expressionVirtualColumn(
+                                "v0",
+                                "array_length(filter((x) -> 
!array_contains(array('b'), x), \"dim3\"))",
+                                ValueType.LONG
+                            )
+                        )
+                        .setDimensions(
+                            dimensions(
+                                new DefaultDimensionSpec("v0", "_d0", 
ValueType.LONG)
+                            )
+                        )
+                        .setAggregatorSpecs(aggregators(new 
LongSumAggregatorFactory("a0", "cnt")))
+                        .setLimitSpec(new DefaultLimitSpec(
+                            ImmutableList.of(new OrderByColumnSpec(
+                                "a0",
+                                OrderByColumnSpec.Direction.DESCENDING,
+                                StringComparators.NUMERIC
+                            )),
+                            Integer.MAX_VALUE
+                        ))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        useDefault ? ImmutableList.of(new Object[]{1, 6L}) : 
ImmutableList.of(new Object[]{1, 4L}, new Object[]{0, 2L})
+    );
+  }
+
+  @Test
+  public void testFilterOnMultiValueListFilterNoMatch() throws Exception
+  {
+    // Cannot vectorize due to usage of expressions.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT dim3, SUM(cnt) FROM druid.numfoo WHERE MV_FILTER_ONLY(dim3, 
ARRAY['b']) = 'a' GROUP BY 1 ORDER BY 2 DESC",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(CalciteTests.DATASOURCE3)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ListFilteredVirtualColumn(
+                                "v0",
+                                DefaultDimensionSpec.of("dim3"),
+                                ImmutableSet.of("b"),
+                                true
+                            )
+                        )
+                        .setDimFilter(selector("v0", "a", null))
+                        .setDimensions(
+                            dimensions(
+                                new DefaultDimensionSpec("dim3", "_d0", 
ValueType.STRING)
+                            )
+                        )
+                        .setAggregatorSpecs(aggregators(new 
LongSumAggregatorFactory("a0", "cnt")))
+                        .setLimitSpec(new DefaultLimitSpec(
+                            ImmutableList.of(new OrderByColumnSpec(
+                                "a0",
+                                OrderByColumnSpec.Direction.DESCENDING,
+                                StringComparators.NUMERIC
+                            )),
+                            Integer.MAX_VALUE
+                        ))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of()
+    );
+  }
+
+  @Test
+  public void testFilterOnMultiValueListFilterMatch() throws Exception
+  {
+    // Cannot vectorize due to usage of expressions.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT dim3, SUM(cnt) FROM druid.numfoo WHERE MV_FILTER_ONLY(dim3, 
ARRAY['b']) = 'b' GROUP BY 1 ORDER BY 2 DESC",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(CalciteTests.DATASOURCE3)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ListFilteredVirtualColumn(
+                                "v0",
+                                DefaultDimensionSpec.of("dim3"),
+                                ImmutableSet.of("b"),
+                                true
+                            )
+                        )
+                        .setDimFilter(selector("v0", "b", null))
+                        .setDimensions(
+                            dimensions(
+                                new DefaultDimensionSpec("dim3", "_d0", 
ValueType.STRING)
+                            )
+                        )
+                        .setAggregatorSpecs(aggregators(new 
LongSumAggregatorFactory("a0", "cnt")))
+                        .setLimitSpec(new DefaultLimitSpec(
+                            ImmutableList.of(new OrderByColumnSpec(
+                                "a0",
+                                OrderByColumnSpec.Direction.DESCENDING,
+                                StringComparators.NUMERIC
+                            )),
+                            Integer.MAX_VALUE
+                        ))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"b", 2L},
+            new Object[]{"a", 1L},
+            new Object[]{"c", 1L}
+        )
+    );
+  }
+
+  @Test
+  public void testFilterOnMultiValueListFilterMatchLike() throws Exception
+  {
+    // Cannot vectorize due to usage of expressions.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT dim3, SUM(cnt) FROM druid.numfoo WHERE MV_FILTER_ONLY(dim3, 
ARRAY['b']) LIKE 'b%' GROUP BY 1 ORDER BY 2 DESC",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(CalciteTests.DATASOURCE3)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ListFilteredVirtualColumn(
+                                "v0",
+                                DefaultDimensionSpec.of("dim3"),
+                                ImmutableSet.of("b"),
+                                true
+                            )
+                        )
+                        .setDimFilter(new LikeDimFilter("v0", "b%", null, 
null))
+                        .setDimensions(
+                            dimensions(
+                                new DefaultDimensionSpec("dim3", "_d0", 
ValueType.STRING)
+                            )
+                        )
+                        .setAggregatorSpecs(aggregators(new 
LongSumAggregatorFactory("a0", "cnt")))
+                        .setLimitSpec(new DefaultLimitSpec(
+                            ImmutableList.of(new OrderByColumnSpec(
+                                "a0",
+                                OrderByColumnSpec.Direction.DESCENDING,
+                                StringComparators.NUMERIC
+                            )),
+                            Integer.MAX_VALUE
+                        ))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"b", 2L},
+            new Object[]{"a", 1L},
+            new Object[]{"c", 1L}
+        )
+    );
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to