This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/31.0.0 by this push:
     new a0c842e98b5 Create a FilterBundle.Builder class and use it to 
construct FilterBundle. (#17055) (#17159)
a0c842e98b5 is described below

commit a0c842e98b53de8948cd9b837ba69167dd48d3bc
Author: Cece Mei <[email protected]>
AuthorDate: Wed Sep 25 18:04:32 2024 -0700

    Create a FilterBundle.Builder class and use it to construct FilterBundle. 
(#17055) (#17159)
---
 .../java/org/apache/druid/query/QueryContexts.java |    1 +
 .../java/org/apache/druid/query/filter/Filter.java |   47 +-
 .../apache/druid/query/filter/FilterBundle.java    |  177 +++-
 .../apache/druid/query/filter/FilterTuning.java    |    2 +-
 .../druid/segment/QueryableIndexCursorHolder.java  |   13 +-
 .../org/apache/druid/segment/filter/AndFilter.java |  162 +--
 .../org/apache/druid/segment/filter/OrFilter.java  | 1115 ++++++++++----------
 .../druid/segment/index/BitmapColumnIndex.java     |   10 +-
 .../druid/segment/filter/FilterBundleTest.java     |   17 +-
 9 files changed, 831 insertions(+), 713 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java 
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index afdc5a552f0..ced9f0d4e2d 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -63,6 +63,7 @@ public class QueryContexts
   public static final String REWRITE_JOIN_TO_FILTER_ENABLE_KEY = 
"enableRewriteJoinToFilter";
   public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = 
"joinFilterRewriteMaxSize";
   public static final String MAX_NUMERIC_IN_FILTERS = "maxNumericInFilters";
+  public static final String CURSOR_AUTO_ARRANGE_FILTERS = 
"cursorAutoArrangeFilters";
   // This flag controls whether a SQL join query with left scan should be 
attempted to be run as direct table access
   // instead of being wrapped inside a query. With direct table access 
enabled, Druid can push down the join operation to
   // data servers.
diff --git a/processing/src/main/java/org/apache/druid/query/filter/Filter.java 
b/processing/src/main/java/org/apache/druid/query/filter/Filter.java
index f30681c8669..147406b2261 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/Filter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/Filter.java
@@ -47,27 +47,26 @@ public interface Filter
    * cursor. If both are set, the cursor will effectively perform a logical 
AND to combine them.
    * See {@link FilterBundle} for additional details.
    *
-   * @param columnIndexSelector - provides {@link 
org.apache.druid.segment.column.ColumnIndexSupplier} to fetch column
-   *                              indexes and {@link 
org.apache.druid.collections.bitmap.BitmapFactory} to manipulate
-   *                              them
-   * @param bitmapResultFactory - wrapper for {@link ImmutableBitmap} 
operations to tie into
-   *                              {@link org.apache.druid.query.QueryMetrics} 
and build the output indexes
-   * @param applyRowCount       - upper bound on number of rows this filter 
would be applied to, after removing rows
-   *                              short-circuited by prior bundle operations. 
For example, given "x AND y", if "x" is
-   *                              resolved using an index, then "y" will 
receive the number of rows that matched
-   *                              the filter "x". As another example, given "x 
OR y", if "x" is resolved using an
-   *                              index, then "y" will receive the number of 
rows that did *not* match the filter "x".
-   * @param totalRowCount       - total number of rows to be scanned if no 
indexes are applied
-   * @param includeUnknown      - mapping for Druid native two state logic 
system into SQL three-state logic system. If
-   *                              set to true, bitmaps returned by this method 
should include true bits for any rows
-   *                              where the matching result is 'unknown', such 
as from the input being null valued.
-   *                              See {@link NullHandling#useThreeValueLogic()}
-   * @return                    - {@link FilterBundle} containing any indexes 
and/or matchers that are needed to build
-   *                              a cursor
-   * @param <T>                 - Type of {@link BitmapResultFactory} results, 
{@link ImmutableBitmap} by default
+   * @param filterBundleBuilder contains {@link BitmapColumnIndex} and {@link 
ColumnIndexSelector}, and some additional
+   *                            info needed.
+   * @param bitmapResultFactory wrapper for {@link ImmutableBitmap} operations 
to tie into
+   *                            {@link org.apache.druid.query.QueryMetrics} 
and build the output indexes
+   * @param applyRowCount       upper bound on number of rows this filter 
would be applied to, after removing rows
+   *                            short-circuited by prior bundle operations. 
For example, given "x AND y", if "x" is
+   *                            resolved using an index, then "y" will receive 
the number of rows that matched
+   *                            the filter "x". As another example, given "x 
OR y", if "x" is resolved using an
+   *                            index, then "y" will receive the number of 
rows that did *not* match the filter "x".
+   * @param totalRowCount       total number of rows to be scanned if no 
indexes are applied
+   * @param includeUnknown      mapping for Druid native two state logic 
system into SQL three-state logic system. If
+   *                            set to true, bitmaps returned by this method 
should include true bits for any rows
+   *                            where the matching result is 'unknown', such 
as from the input being null valued.
+   *                            See {@link NullHandling#useThreeValueLogic()}
+   * @param <T>                 type of {@link BitmapResultFactory} results, 
{@link ImmutableBitmap} by default
+   * @return {@link FilterBundle} containing any indexes and/or matchers that 
are needed to build
+   * a cursor
    */
   default <T> FilterBundle makeFilterBundle(
-      ColumnIndexSelector columnIndexSelector,
+      FilterBundle.Builder filterBundleBuilder,
       BitmapResultFactory<T> bitmapResultFactory,
       int applyRowCount,
       int totalRowCount,
@@ -76,7 +75,7 @@ public interface Filter
   {
     final FilterBundle.IndexBundle indexBundle;
     final boolean needMatcher;
-    final BitmapColumnIndex columnIndex = 
getBitmapColumnIndex(columnIndexSelector);
+    final BitmapColumnIndex columnIndex = 
filterBundleBuilder.getBitmapColumnIndex();
     if (columnIndex != null) {
       final long bitmapConstructionStartNs = System.nanoTime();
       final T result = columnIndex.computeBitmapResult(
@@ -107,7 +106,7 @@ public interface Filter
           new FilterBundle.MatcherBundleInfo(this::toString, null, null),
           this::makeMatcher,
           this::makeVectorMatcher,
-          this.canVectorizeMatcher(columnIndexSelector)
+          
this.canVectorizeMatcher(filterBundleBuilder.getColumnIndexSelector())
       );
     } else {
       matcherBundle = null;
@@ -122,7 +121,6 @@ public interface Filter
    * examine details about the index prior to computing it, via {@link 
BitmapColumnIndex#getIndexCapabilities()}.
    *
    * @param selector Object used to create BitmapColumnIndex
-   *
    * @return BitmapColumnIndex that can build ImmutableBitmap of matched row 
numbers
    */
   @Nullable
@@ -132,7 +130,6 @@ public interface Filter
    * Get a {@link ValueMatcher} that applies this filter to row values.
    *
    * @param factory Object used to create ValueMatchers
-   *
    * @return ValueMatcher that applies this filter to row values.
    */
   ValueMatcher makeMatcher(ColumnSelectorFactory factory);
@@ -141,7 +138,6 @@ public interface Filter
    * Get a {@link VectorValueMatcher} that applies this filter to row vectors.
    *
    * @param factory Object used to create ValueMatchers
-   *
    * @return VectorValueMatcher that applies this filter to row vectors.
    */
   default VectorValueMatcher makeVectorMatcher(VectorColumnSelectorFactory 
factory)
@@ -151,6 +147,7 @@ public interface Filter
 
   /**
    * Returns true if this filter can produce a vectorized matcher from its 
"makeVectorMatcher" method.
+   *
    * @param inspector Supplies type information for the selectors this filter 
will match against
    */
   default boolean canVectorizeMatcher(ColumnInspector inspector)
@@ -176,7 +173,7 @@ public interface Filter
    * Return a copy of this filter that is identical to the this filter except 
that it operates on different columns,
    * based on a renaming map where the key is the column to be renamed in the 
filter, and the value is the new
    * column name.
-   *
+   * <p>
    * For example, if I have a filter (A = hello), and I have a renaming map (A 
-> B),
    * this should return the filter (B = hello)
    *
diff --git 
a/processing/src/main/java/org/apache/druid/query/filter/FilterBundle.java 
b/processing/src/main/java/org/apache/druid/query/filter/FilterBundle.java
index e105b0d6163..8511642a0c4 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/FilterBundle.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/FilterBundle.java
@@ -24,16 +24,21 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.query.filter.vector.VectorValueMatcher;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.column.ColumnIndexCapabilities;
 import org.apache.druid.segment.column.SimpleColumnIndexCapabilities;
 import org.apache.druid.segment.data.Offset;
 import org.apache.druid.segment.filter.FalseFilter;
+import org.apache.druid.segment.index.BitmapColumnIndex;
 import org.apache.druid.segment.vector.ReadableVectorOffset;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -57,27 +62,12 @@ import java.util.regex.Pattern;
  */
 public class FilterBundle
 {
-  public static FilterBundle allFalse(long constructionTime, ImmutableBitmap 
emptyBitmap)
-  {
-    return new FilterBundle(
-        new FilterBundle.SimpleIndexBundle(
-            new FilterBundle.IndexBundleInfo(() -> 
FalseFilter.instance().toString(), 0, constructionTime, null),
-            emptyBitmap,
-            SimpleColumnIndexCapabilities.getConstant()
-        ),
-        null
-    );
-  }
-
   @Nullable
   private final IndexBundle indexBundle;
   @Nullable
   private final MatcherBundle matcherBundle;
 
-  public FilterBundle(
-      @Nullable IndexBundle index,
-      @Nullable MatcherBundle matcherBundle
-  )
+  public FilterBundle(@Nullable IndexBundle index, @Nullable MatcherBundle 
matcherBundle)
   {
     Preconditions.checkArgument(
         index != null || matcherBundle != null,
@@ -87,6 +77,17 @@ public class FilterBundle
     this.matcherBundle = matcherBundle;
   }
 
+  public static FilterBundle allFalse(long constructionTime, ImmutableBitmap 
emptyBitmap)
+  {
+    return new FilterBundle(
+        new FilterBundle.SimpleIndexBundle(
+            new FilterBundle.IndexBundleInfo(() -> 
FalseFilter.instance().toString(), 0, constructionTime, null),
+            emptyBitmap,
+            SimpleColumnIndexCapabilities.getConstant()
+        ),
+        null
+    );
+  }
 
   @Nullable
   public IndexBundle getIndex()
@@ -151,6 +152,95 @@ public class FilterBundle
     boolean canVectorize();
   }
 
+  /**
+   * Wraps info needed to build a {@link FilterBundle}, and provides an 
estimated compute cost for
+   * {@link BitmapColumnIndex#computeBitmapResult}.
+   */
+  public static class Builder
+  {
+    private final Filter filter;
+    private final ColumnIndexSelector columnIndexSelector;
+    @Nullable
+    private final BitmapColumnIndex bitmapColumnIndex;
+    private final List<FilterBundle.Builder> childBuilders;
+    private final int estimatedIndexComputeCost;
+
+    public Builder(Filter filter, ColumnIndexSelector columnIndexSelector, 
boolean cursorAutoArrangeFilters)
+    {
+      this.filter = filter;
+      this.columnIndexSelector = columnIndexSelector;
+      this.bitmapColumnIndex = 
filter.getBitmapColumnIndex(columnIndexSelector);
+      // Construct Builder instances for all child filters recursively.
+      if (filter instanceof BooleanFilter) {
+        Collection<Filter> childFilters = ((BooleanFilter) 
filter).getFilters();
+        this.childBuilders = new ArrayList<>(childFilters.size());
+        for (Filter childFilter : childFilters) {
+          this.childBuilders.add(new FilterBundle.Builder(childFilter, 
columnIndexSelector, cursorAutoArrangeFilters));
+        }
+      } else {
+        this.childBuilders = new ArrayList<>(0);
+      }
+      if (cursorAutoArrangeFilters) {
+        // Sort child builders by cost in ASCENDING order, should be stable by 
default.
+        
this.childBuilders.sort(Comparator.comparingInt(FilterBundle.Builder::getEstimatedIndexComputeCost));
+        this.estimatedIndexComputeCost = calculateEstimatedIndexComputeCost();
+      } else {
+        this.estimatedIndexComputeCost = Integer.MAX_VALUE;
+      }
+    }
+
+    private int calculateEstimatedIndexComputeCost()
+    {
+      if (this.bitmapColumnIndex == null) {
+        return Integer.MAX_VALUE;
+      }
+      int cost = this.bitmapColumnIndex.estimatedComputeCost();
+      if (cost == Integer.MAX_VALUE) {
+        return Integer.MAX_VALUE;
+      }
+
+      for (FilterBundle.Builder childBuilder : childBuilders) {
+        int childCost = childBuilder.getEstimatedIndexComputeCost();
+        if (childCost >= Integer.MAX_VALUE - cost) {
+          return Integer.MAX_VALUE;
+        }
+        cost += childCost;
+      }
+      return cost;
+    }
+
+    public ColumnIndexSelector getColumnIndexSelector()
+    {
+      return columnIndexSelector;
+    }
+
+    @Nullable
+    public BitmapColumnIndex getBitmapColumnIndex()
+    {
+      return bitmapColumnIndex;
+    }
+
+    public List<FilterBundle.Builder> getChildBuilders()
+    {
+      return childBuilders;
+    }
+
+    public int getEstimatedIndexComputeCost()
+    {
+      return estimatedIndexComputeCost;
+    }
+
+    public <T> FilterBundle build(
+        BitmapResultFactory<T> bitmapResultFactory,
+        int applyRowCount,
+        int totalRowCount,
+        boolean includeUnknown
+    )
+    {
+      return filter.makeFilterBundle(this, bitmapResultFactory, applyRowCount, 
totalRowCount, includeUnknown);
+    }
+  }
+
   public static class SimpleIndexBundle implements IndexBundle
   {
     private final IndexBundleInfo info;
@@ -211,11 +301,7 @@ public class FilterBundle
     }
 
     @Override
-    public ValueMatcher valueMatcher(
-        ColumnSelectorFactory selectorFactory,
-        Offset baseOffset,
-        boolean descending
-    )
+    public ValueMatcher valueMatcher(ColumnSelectorFactory selectorFactory, 
Offset baseOffset, boolean descending)
     {
       return matcherFn.apply(selectorFactory);
     }
@@ -339,12 +425,11 @@ public class FilterBundle
      */
     public String describe()
     {
-      final StringBuilder sb = new StringBuilder()
-          .append("index: ")
-          .append(filter.get())
-          .append(" (selectionSize = ")
-          .append(selectionSize)
-          .append(")\n");
+      final StringBuilder sb = new StringBuilder().append("index: ")
+                                                  .append(filter.get())
+                                                  .append(" (selectionSize = ")
+                                                  .append(selectionSize)
+                                                  .append(")\n");
 
       if (indexes != null) {
         for (final IndexBundleInfo info : indexes) {
@@ -358,23 +443,26 @@ public class FilterBundle
     @Override
     public String toString()
     {
-      return "{" +
-             "filter=\"" + filter.get() + '\"' +
-             ", selectionSize=" + selectionSize +
-             ", buildTime=" + TimeUnit.NANOSECONDS.toMicros(buildTimeNs) + 
"μs" +
-             (indexes != null ? ", indexes=" + indexes : "") +
-             '}';
+      return "{"
+             + "filter=\""
+             + filter.get()
+             + '\"'
+             + ", selectionSize="
+             + selectionSize
+             + ", buildTime="
+             + TimeUnit.NANOSECONDS.toMicros(buildTimeNs)
+             + "μs"
+             + (indexes != null ? ", indexes=" + indexes : "")
+             + '}';
     }
   }
 
   public static class MatcherBundleInfo
   {
     private static final Pattern PATTERN_LINE_START = Pattern.compile("(?m)^");
-
-    private final Supplier<String> filter;
     @Nullable
     final List<MatcherBundleInfo> matchers;
-
+    private final Supplier<String> filter;
     @Nullable
     private final IndexBundleInfo partialIndex;
 
@@ -415,10 +503,7 @@ public class FilterBundle
      */
     public String describe()
     {
-      final StringBuilder sb = new StringBuilder()
-          .append("matcher: ")
-          .append(filter.get())
-          .append("\n");
+      final StringBuilder sb = new StringBuilder().append("matcher: 
").append(filter.get()).append("\n");
 
       if (partialIndex != null) {
         sb.append("  with partial ")
@@ -437,11 +522,13 @@ public class FilterBundle
     @Override
     public String toString()
     {
-      return "{" +
-             "filter=\"" + filter.get() + '\"' +
-             (partialIndex != null ? ", partialIndex=" + partialIndex : "") +
-             (matchers != null ? ", matchers=" + matchers : "") +
-             '}';
+      return "{"
+             + "filter=\""
+             + filter.get()
+             + '\"'
+             + (partialIndex != null ? ", partialIndex=" + partialIndex : "")
+             + (matchers != null ? ", matchers=" + matchers : "")
+             + '}';
     }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/filter/FilterTuning.java 
b/processing/src/main/java/org/apache/druid/query/filter/FilterTuning.java
index 831d50261e2..892192128e0 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/FilterTuning.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/FilterTuning.java
@@ -30,7 +30,7 @@ import java.util.Objects;
 
 /**
  * This class provides a mechanism to influence whether or not indexes are 
used for a {@link Filter} during processing
- * by {@link Filter#makeFilterBundle(ColumnIndexSelector, BitmapResultFactory, 
int, int, boolean)}
+ * by {@link Filter#makeFilterBundle(FilterBundle.Builder, 
BitmapResultFactory, int, int, boolean)}
  * (i.e. will a {@link Filter} be a "pre" filter in which we union indexes for 
all values that match the filter to
  * create a {@link org.apache.druid.segment.BitmapOffset}/{@link 
org.apache.druid.segment.vector.BitmapVectorOffset},
  * or will it be used as a "post" filter and evaluated while scanning row 
values from the
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorHolder.java
 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorHolder.java
index 5188b385b36..5aa4dbed8cc 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorHolder.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorHolder.java
@@ -34,6 +34,7 @@ import org.apache.druid.query.Order;
 import org.apache.druid.query.OrderBy;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.filter.Filter;
@@ -112,6 +113,7 @@ public class QueryableIndexCursorHolder implements 
CursorHolder
             Cursors.getTimeOrdering(ordering),
             interval,
             filter,
+            
cursorBuildSpec.getQueryContext().getBoolean(QueryContexts.CURSOR_AUTO_ARRANGE_FILTERS,
 false),
             metrics
         )
     );
@@ -346,7 +348,6 @@ public class QueryableIndexCursorHolder implements 
CursorHolder
    * @param timestamp  the timestamp to search for
    * @param startIndex first index to search, inclusive
    * @param endIndex   last index to search, exclusive
-   *
    * @return first index that has a timestamp equal to, or greater, than 
"timestamp"
    */
   @VisibleForTesting
@@ -665,6 +666,7 @@ public class QueryableIndexCursorHolder implements 
CursorHolder
         Order timeOrder,
         Interval interval,
         @Nullable Filter filter,
+        boolean cursorAutoArrangeFilters,
         @Nullable QueryMetrics<? extends Query<?>> metrics
     )
     {
@@ -688,6 +690,7 @@ public class QueryableIndexCursorHolder implements 
CursorHolder
                 interval,
                 filter
             ),
+            cursorAutoArrangeFilters,
             bitmapIndexSelector,
             numRows,
             metrics
@@ -708,13 +711,14 @@ public class QueryableIndexCursorHolder implements 
CursorHolder
 
   /**
    * Create a {@link FilterBundle} for a cursor hold instance.
-   *
+   * <p>
    * The provided filter must include the query-level interface if needed. To 
compute this properly, use
    * {@link #computeFilterWithIntervalIfNeeded}.
    */
   @Nullable
   private static FilterBundle makeFilterBundle(
       @Nullable final Filter filter,
+      boolean cursorAutoArrangeFilters,
       final ColumnSelectorColumnIndexSelector bitmapIndexSelector,
       final int numRows,
       @Nullable final QueryMetrics<?> metrics
@@ -732,8 +736,11 @@ public class QueryableIndexCursorHolder implements 
CursorHolder
       return null;
     }
     final long bitmapConstructionStartNs = System.nanoTime();
-    final FilterBundle filterBundle = filter.makeFilterBundle(
+    final FilterBundle filterBundle = new FilterBundle.Builder(
+        filter,
         bitmapIndexSelector,
+        cursorAutoArrangeFilters
+    ).build(
         bitmapResultFactory,
         numRows,
         numRows,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/AndFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/AndFilter.java
index dfc618acad8..c4172fc6ce2 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/AndFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/AndFilter.java
@@ -72,9 +72,68 @@ public class AndFilter implements BooleanFilter
     this(new LinkedHashSet<>(filters));
   }
 
+  public static ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers)
+  {
+    Preconditions.checkState(baseMatchers.length > 0);
+    if (baseMatchers.length == 1) {
+      return baseMatchers[0];
+    }
+
+    return new ValueMatcher()
+    {
+      @Override
+      public boolean matches(boolean includeUnknown)
+      {
+        for (ValueMatcher matcher : baseMatchers) {
+          if (!matcher.matches(includeUnknown)) {
+            return false;
+          }
+        }
+        return true;
+      }
+
+      @Override
+      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      {
+        inspector.visit("firstBaseMatcher", baseMatchers[0]);
+        inspector.visit("secondBaseMatcher", baseMatchers[1]);
+        // Don't inspect the 3rd and all consequent baseMatchers, cut runtime 
shape combinations at this point.
+        // Anyway if the filter is so complex, Hotspot won't inline all calls 
because of the inline limit.
+      }
+    };
+  }
+
+  public static VectorValueMatcher makeVectorMatcher(final 
VectorValueMatcher[] baseMatchers)
+  {
+    Preconditions.checkState(baseMatchers.length > 0);
+    if (baseMatchers.length == 1) {
+      return baseMatchers[0];
+    }
+
+    return new BaseVectorValueMatcher(baseMatchers[0])
+    {
+      @Override
+      public ReadableVectorMatch match(final ReadableVectorMatch mask, boolean 
includeUnknown)
+      {
+        ReadableVectorMatch match = mask;
+
+        for (VectorValueMatcher matcher : baseMatchers) {
+          if (match.isAllFalse()) {
+            // Short-circuit if the entire vector is false.
+            break;
+          }
+          match = matcher.match(match, includeUnknown);
+        }
+
+        assert match.isValid(mask);
+        return match;
+      }
+    };
+  }
+
   @Override
   public <T> FilterBundle makeFilterBundle(
-      ColumnIndexSelector columnIndexSelector,
+      FilterBundle.Builder filterBundleBuilder,
       BitmapResultFactory<T> bitmapResultFactory,
       int applyRowCount,
       int totalRowCount,
@@ -97,20 +156,21 @@ public class AndFilter implements BooleanFilter
     // a nested AND filter might also partition itself into indexes and 
bundles, and since it is part of a logical AND
     // operation, this is valid (and even preferable).
     final long bitmapConstructionStartNs = System.nanoTime();
-    for (Filter subfilter : filters) {
-      final FilterBundle subBundle = subfilter.makeFilterBundle(
-          columnIndexSelector,
+    for (FilterBundle.Builder subFilterBundleBuilder : 
filterBundleBuilder.getChildBuilders()) {
+      final FilterBundle subBundle = subFilterBundleBuilder.build(
           bitmapResultFactory,
           Math.min(applyRowCount, indexIntersectionSize),
           totalRowCount,
           includeUnknown
       );
-      if (subBundle.getIndex() != null) {
+      if (subBundle.hasIndex()) {
         if (subBundle.getIndex().getBitmap().isEmpty()) {
           // if nothing matches for any sub filter, short-circuit, because 
nothing can possibly match
           return FilterBundle.allFalse(
               System.nanoTime() - bitmapConstructionStartNs,
-              columnIndexSelector.getBitmapFactory().makeEmptyImmutableBitmap()
+              subFilterBundleBuilder.getColumnIndexSelector()
+                                    .getBitmapFactory()
+                                    .makeEmptyImmutableBitmap()
           );
         }
         merged = merged.merge(subBundle.getIndex().getIndexCapabilities());
@@ -122,7 +182,7 @@ public class AndFilter implements BooleanFilter
         }
         indexIntersectionSize = index.size();
       }
-      if (subBundle.getMatcherBundle() != null) {
+      if (subBundle.hasMatcher()) {
         matcherBundles.add(subBundle.getMatcherBundle());
         matcherBundleInfos.add(subBundle.getMatcherBundle().getMatcherInfo());
       }
@@ -131,11 +191,7 @@ public class AndFilter implements BooleanFilter
     final FilterBundle.IndexBundle indexBundle;
     if (index != null) {
       if (indexBundleInfos.size() == 1) {
-        indexBundle = new FilterBundle.SimpleIndexBundle(
-            indexBundleInfos.get(0),
-            index,
-            merged
-        );
+        indexBundle = new 
FilterBundle.SimpleIndexBundle(indexBundleInfos.get(0), index, merged);
       } else {
         indexBundle = new FilterBundle.SimpleIndexBundle(
             new FilterBundle.IndexBundleInfo(
@@ -162,11 +218,7 @@ public class AndFilter implements BooleanFilter
           if (matcherBundles.size() == 1) {
             return matcherBundleInfos.get(0);
           }
-          return new FilterBundle.MatcherBundleInfo(
-              () -> "AND",
-              null,
-              matcherBundleInfos
-          );
+          return new FilterBundle.MatcherBundleInfo(() -> "AND", null, 
matcherBundleInfos);
         }
 
         @Override
@@ -180,7 +232,10 @@ public class AndFilter implements BooleanFilter
         }
 
         @Override
-        public VectorValueMatcher vectorMatcher(VectorColumnSelectorFactory 
selectorFactory, ReadableVectorOffset baseOffset)
+        public VectorValueMatcher vectorMatcher(
+            VectorColumnSelectorFactory selectorFactory,
+            ReadableVectorOffset baseOffset
+        )
         {
           final VectorValueMatcher[] vectorMatchers = new 
VectorValueMatcher[matcherBundles.size()];
           for (int i = 0; i < matcherBundles.size(); i++) {
@@ -204,10 +259,7 @@ public class AndFilter implements BooleanFilter
       matcherBundle = null;
     }
 
-    return new FilterBundle(
-        indexBundle,
-        matcherBundle
-    );
+    return new FilterBundle(indexBundle, matcherBundle);
   }
 
   @Nullable
@@ -239,6 +291,13 @@ public class AndFilter implements BooleanFilter
         return finalMerged;
       }
 
+      @Override
+      public int estimatedComputeCost()
+      {
+        // There's no additional cost on AND filter, cost in child filters 
would be summed.
+        return 0;
+      }
+
       @Override
       public <T> T computeBitmapResult(BitmapResultFactory<T> 
bitmapResultFactory, boolean includeUnknown)
       {
@@ -350,65 +409,6 @@ public class AndFilter implements BooleanFilter
     return StringUtils.format("(%s)", AND_JOINER.join(filters));
   }
 
-  public static ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers)
-  {
-    Preconditions.checkState(baseMatchers.length > 0);
-    if (baseMatchers.length == 1) {
-      return baseMatchers[0];
-    }
-
-    return new ValueMatcher()
-    {
-      @Override
-      public boolean matches(boolean includeUnknown)
-      {
-        for (ValueMatcher matcher : baseMatchers) {
-          if (!matcher.matches(includeUnknown)) {
-            return false;
-          }
-        }
-        return true;
-      }
-
-      @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-      {
-        inspector.visit("firstBaseMatcher", baseMatchers[0]);
-        inspector.visit("secondBaseMatcher", baseMatchers[1]);
-        // Don't inspect the 3rd and all consequent baseMatchers, cut runtime 
shape combinations at this point.
-        // Anyway if the filter is so complex, Hotspot won't inline all calls 
because of the inline limit.
-      }
-    };
-  }
-
-  public static VectorValueMatcher makeVectorMatcher(final 
VectorValueMatcher[] baseMatchers)
-  {
-    Preconditions.checkState(baseMatchers.length > 0);
-    if (baseMatchers.length == 1) {
-      return baseMatchers[0];
-    }
-
-    return new BaseVectorValueMatcher(baseMatchers[0])
-    {
-      @Override
-      public ReadableVectorMatch match(final ReadableVectorMatch mask, boolean 
includeUnknown)
-      {
-        ReadableVectorMatch match = mask;
-
-        for (VectorValueMatcher matcher : baseMatchers) {
-          if (match.isAllFalse()) {
-            // Short-circuit if the entire vector is false.
-            break;
-          }
-          match = matcher.match(match, includeUnknown);
-        }
-
-        assert match.isValid(mask);
-        return match;
-      }
-    };
-  }
-
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/OrFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/OrFilter.java
index 700b2fbfa16..e8bdce85c9b 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/OrFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/OrFilter.java
@@ -78,421 +78,162 @@ public class OrFilter implements BooleanFilter
     this(new LinkedHashSet<>(filters));
   }
 
-  @Override
-  public <T> FilterBundle makeFilterBundle(
-      ColumnIndexSelector columnIndexSelector,
-      BitmapResultFactory<T> bitmapResultFactory,
-      int applyRowCount,
-      int totalRowCount,
-      boolean includeUnknown
-  )
+  private static ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers)
   {
-    // for OR filters, we have a few possible outcomes:
-    // 1 - all clauses are index only bundles. in this case we union the 
bitmaps together and make an index only bundle
-    // 2 - some clauses support indexes. in this case, we union the bitmaps of 
any index only bundles together to form a
-    //     partial index which is constructed into a matcher bundle with 
convertIndexToMatcherBundle. We translate any
-    //     index AND matcher bundles into a matcher only bundle with 
convertBundleToMatcherOnlyBundle. Finally, we
-    //     combine these with the remaining matcher only bundles to with 
makeMatcher/makeVectorMatcher to make a matcher
-    //     only bundle
-    // 3 - no clauses support indexes. in this case, we make a matcher only 
bundle using makeMatcher/makeVectorMatcher
-
-    final List<FilterBundle.IndexBundle> indexOnlyBundles = new ArrayList<>();
-    final List<FilterBundle.IndexBundleInfo> indexOnlyBundlesInfo = new 
ArrayList<>();
-    final List<FilterBundle.MatcherBundle> partialIndexBundles = new 
ArrayList<>();
-    final List<FilterBundle.MatcherBundle> matcherOnlyBundles = new 
ArrayList<>();
-
-    int indexUnionSize = 0;
-    ImmutableBitmap index = null;
-    ColumnIndexCapabilities merged = new SimpleColumnIndexCapabilities(true, 
true);
-    int emptyCount = 0;
+    Preconditions.checkState(baseMatchers.length > 0);
 
-    final long bitmapConstructionStartNs = System.nanoTime();
+    if (baseMatchers.length == 1) {
+      return baseMatchers[0];
+    }
 
-    for (Filter subfilter : filters) {
-      final FilterBundle bundle = subfilter.makeFilterBundle(
-          columnIndexSelector,
-          bitmapResultFactory,
-          Math.min(applyRowCount, totalRowCount - indexUnionSize),
-          totalRowCount,
-          includeUnknown
-      );
-      if (bundle.hasIndex()) {
-        final ImmutableBitmap bundleIndex = bundle.getIndex().getBitmap();
-        if (bundleIndex.isEmpty()) {
-          // we leave any indexes which are empty out of index, 
indexOnlyBundles, and partialIndexBundles
-          // even though we skip them, we still keep track of them to check 
for the case when we can build the OR into
-          // an index only bundle. We can count index and matcher bundles here 
too because the AND operation means that
-          // an empty index means the matcher can be skipped
-          emptyCount++;
-        } else {
-          if (bundle.hasMatcher()) {
-            // index and matcher bundles must be handled separately, they will 
need to be a single value matcher built
-            // by doing an AND operation between the index and the value 
matcher
-            // (a bundle is basically an AND operation between the index and 
matcher if the matcher is present)
-            partialIndexBundles.add(convertBundleToMatcherOnlyBundle(bundle, 
bundleIndex));
-          } else {
-            indexOnlyBundles.add(bundle.getIndex());
-            indexOnlyBundlesInfo.add(bundle.getIndex().getIndexInfo());
-            merged.merge(bundle.getIndex().getIndexCapabilities());
-            // union index only bitmaps together; if all sub-filters are 
'index only' bundles we will make an index only
-            // bundle ourselves, else we will use this index as a single value 
matcher
-            if (index == null) {
-              index = bundle.getIndex().getBitmap();
-            } else {
-              index = index.union(bundle.getIndex().getBitmap());
-            }
-            indexUnionSize = index.size();
+    return new ValueMatcher()
+    {
+      @Override
+      public boolean matches(boolean includeUnknown)
+      {
+        for (ValueMatcher matcher : baseMatchers) {
+          if (matcher.matches(includeUnknown)) {
+            return true;
           }
         }
-      } else {
-        matcherOnlyBundles.add(bundle.getMatcherBundle());
+        return false;
       }
-    }
-    final long totalBitmapConstructTimeNs = System.nanoTime() - 
bitmapConstructionStartNs;
-
 
-    // if all the filters are 'index only', we can make an index only bundle
-    if (indexOnlyBundles.size() + emptyCount == filters.size()) {
-      if (index == null || index.isEmpty()) {
-        return FilterBundle.allFalse(
-            totalBitmapConstructTimeNs,
-            columnIndexSelector.getBitmapFactory().makeEmptyImmutableBitmap()
-        );
-      }
-      if (indexOnlyBundles.size() == 1) {
-        return new FilterBundle(
-            indexOnlyBundles.get(0),
-            null
-        );
+      @Override
+      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      {
+        inspector.visit("firstBaseMatcher", baseMatchers[0]);
+        inspector.visit("secondBaseMatcher", baseMatchers[1]);
+        // Don't inspect the 3rd and all consequent baseMatchers, cut runtime 
shape combinations at this point.
+        // Anyway if the filter is so complex, Hotspot won't inline all calls 
because of the inline limit.
       }
-      return new FilterBundle(
-          new FilterBundle.SimpleIndexBundle(
-              new FilterBundle.IndexBundleInfo(
-                  () -> "OR",
-                  applyRowCount,
-                  totalBitmapConstructTimeNs,
-                  indexOnlyBundlesInfo
-              ),
-              index,
-              merged
-          ),
-          null
-      );
-    }
+    };
+  }
 
-    // if not the index only outcome, we build a matcher only bundle from all 
the matchers
-    final int estimatedSize = (indexOnlyBundles.isEmpty() ? 0 : 1)
-                              + partialIndexBundles.size()
-                              + matcherOnlyBundles.size();
-    final List<FilterBundle.MatcherBundle> allMatcherBundles = 
Lists.newArrayListWithCapacity(estimatedSize);
-    final List<FilterBundle.MatcherBundleInfo> allMatcherBundlesInfo = 
Lists.newArrayListWithCapacity(estimatedSize);
-    if (!indexOnlyBundles.isEmpty()) {
-      // translate the indexOnly bundles into a single matcher
-      final FilterBundle.MatcherBundle matcherBundle = 
convertIndexToMatcherBundle(
-          applyRowCount,
-          indexOnlyBundles,
-          indexOnlyBundlesInfo,
-          totalBitmapConstructTimeNs,
-          index
-      );
-      allMatcherBundles.add(matcherBundle);
-      allMatcherBundlesInfo.add(matcherBundle.getMatcherInfo());
-    }
-    for (FilterBundle.MatcherBundle bundle : partialIndexBundles) {
-      allMatcherBundles.add(bundle);
-      allMatcherBundlesInfo.add(bundle.getMatcherInfo());
-    }
-    for (FilterBundle.MatcherBundle bundle : matcherOnlyBundles) {
-      allMatcherBundles.add(bundle);
-      allMatcherBundlesInfo.add(bundle.getMatcherInfo());
+  private static VectorValueMatcher makeVectorMatcher(final 
VectorValueMatcher[] baseMatchers)
+  {
+    Preconditions.checkState(baseMatchers.length > 0);
+    if (baseMatchers.length == 1) {
+      return baseMatchers[0];
     }
 
-    return new FilterBundle(
-        null,
-        new FilterBundle.MatcherBundle()
-        {
-          @Override
-          public FilterBundle.MatcherBundleInfo getMatcherInfo()
-          {
-            return new FilterBundle.MatcherBundleInfo(
-                () -> "OR",
-                null,
-                allMatcherBundlesInfo
-            );
-          }
+    return new BaseVectorValueMatcher(baseMatchers[0])
+    {
+      final VectorMatch currentMask = VectorMatch.wrap(new 
int[getMaxVectorSize()]);
+      final VectorMatch scratch = VectorMatch.wrap(new 
int[getMaxVectorSize()]);
+      final VectorMatch retVal = VectorMatch.wrap(new int[getMaxVectorSize()]);
 
-          @Override
-          public ValueMatcher valueMatcher(ColumnSelectorFactory 
selectorFactory, Offset baseOffset, boolean descending)
-          {
-            final ValueMatcher[] matchers = new 
ValueMatcher[allMatcherBundles.size()];
-            for (int i = 0; i < allMatcherBundles.size(); i++) {
-              matchers[i] = 
allMatcherBundles.get(i).valueMatcher(selectorFactory, baseOffset, descending);
-            }
-            return makeMatcher(matchers);
-          }
+      @Override
+      public ReadableVectorMatch match(final ReadableVectorMatch mask, boolean 
includeUnknown)
+      {
+        ReadableVectorMatch currentMatch = baseMatchers[0].match(mask, 
includeUnknown);
 
-          @Override
-          public VectorValueMatcher vectorMatcher(
-              VectorColumnSelectorFactory selectorFactory,
-              ReadableVectorOffset baseOffset
-          )
-          {
-            final VectorValueMatcher[] matchers = new 
VectorValueMatcher[allMatcherBundles.size()];
-            for (int i = 0; i < allMatcherBundles.size(); i++) {
-              matchers[i] = 
allMatcherBundles.get(i).vectorMatcher(selectorFactory, baseOffset);
-            }
-            return makeVectorMatcher(matchers);
+        // Initialize currentMask = mask, then progressively remove rows from 
the mask as we find matches for them.
+        // This isn't necessary for correctness (we could use the original 
"mask" on every call to "match") but it
+        // allows for short-circuiting on a row-by-row basis.
+        currentMask.copyFrom(mask);
+
+        // Initialize retVal = currentMatch, the rows matched by the first 
matcher. We'll add more as we loop over
+        // the rest of the matchers.
+        retVal.copyFrom(currentMatch);
+
+        for (int i = 1; i < baseMatchers.length; i++) {
+          if (retVal.isAllTrue(getCurrentVectorSize())) {
+            // Short-circuit if the entire vector is true.
+            break;
           }
 
-          @Override
-          public boolean canVectorize()
-          {
-            for (FilterBundle.MatcherBundle bundle : allMatcherBundles) {
-              if (!bundle.canVectorize()) {
-                return false;
-              }
-            }
-            return true;
+          currentMask.removeAll(currentMatch);
+          currentMatch = baseMatchers[i].match(currentMask, false);
+          retVal.addAll(currentMatch, scratch);
+
+          if (currentMatch == currentMask) {
+            // baseMatchers[i] matched every remaining row. Short-circuit out.
+            break;
           }
         }
-    );
-  }
-
-  @Nullable
-  @Override
-  public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
-  {
-    if (filters.size() == 1) {
-      return Iterables.getOnlyElement(filters).getBitmapColumnIndex(selector);
-    }
 
-    List<BitmapColumnIndex> bitmapColumnIndices = new 
ArrayList<>(filters.size());
-    ColumnIndexCapabilities merged = new SimpleColumnIndexCapabilities(true, 
true);
-    for (Filter filter : filters) {
-      BitmapColumnIndex index = filter.getBitmapColumnIndex(selector);
-      if (index == null) {
-        // all or nothing
-        return null;
+        assert retVal.isValid(mask);
+        return retVal;
       }
-      merged = merged.merge(index.getIndexCapabilities());
-      bitmapColumnIndices.add(index);
-    }
+    };
+  }
 
-    final ColumnIndexCapabilities finalMerged = merged;
-    return new BitmapColumnIndex()
+  /**
+   * Convert a {@link FilterBundle} that has both {@link 
FilterBundle#getIndex()} and
+   * {@link FilterBundle#getMatcherBundle()} into a 'matcher only' bundle by 
converting the index into a matcher
+   * with {@link #convertIndexToValueMatcher(ReadableOffset, ImmutableBitmap, 
boolean)} and
+   * {@link #convertIndexToVectorValueMatcher(ReadableVectorOffset, 
ImmutableBitmap)} and then doing a logical AND
+   * with the bundles matchers.
+   */
+  private static FilterBundle.MatcherBundle convertBundleToMatcherOnlyBundle(
+      FilterBundle bundle,
+      ImmutableBitmap bundleIndex
+  )
+  {
+    return new FilterBundle.MatcherBundle()
     {
       @Override
-      public ColumnIndexCapabilities getIndexCapabilities()
+      public FilterBundle.MatcherBundleInfo getMatcherInfo()
       {
-        return finalMerged;
-      }
-
-      @Override
-      public <T> T computeBitmapResult(BitmapResultFactory<T> 
bitmapResultFactory, boolean includeUnknown)
-      {
-        return bitmapResultFactory.union(
-            () -> bitmapColumnIndices.stream().map(x -> 
x.computeBitmapResult(bitmapResultFactory, includeUnknown)).iterator()
+        return new FilterBundle.MatcherBundleInfo(
+            () -> "AND",
+            bundle.getIndex().getIndexInfo(),
+            
Collections.singletonList(bundle.getMatcherBundle().getMatcherInfo())
         );
       }
 
-      @Nullable
       @Override
-      public <T> T computeBitmapResult(
-          BitmapResultFactory<T> bitmapResultFactory,
-          int applyRowCount,
-          int totalRowCount,
-          boolean includeUnknown
+      public ValueMatcher valueMatcher(
+          ColumnSelectorFactory selectorFactory,
+          Offset baseOffset,
+          boolean descending
       )
       {
-        List<T> results = 
Lists.newArrayListWithCapacity(bitmapColumnIndices.size());
-        for (BitmapColumnIndex index : bitmapColumnIndices) {
-          final T r = index.computeBitmapResult(bitmapResultFactory, 
applyRowCount, totalRowCount, includeUnknown);
-          if (r == null) {
-            // all or nothing
-            return null;
-          }
-          results.add(r);
-        }
-        return bitmapResultFactory.union(results);
-      }
-    };
-  }
-
-  @Override
-  public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
-  {
-    final ValueMatcher[] matchers = new ValueMatcher[filters.size()];
-
-    int i = 0;
-    for (Filter filter : filters) {
-      matchers[i++] = filter.makeMatcher(factory);
-    }
-    return makeMatcher(matchers);
-  }
-
-  @Override
-  public VectorValueMatcher makeVectorMatcher(final 
VectorColumnSelectorFactory factory)
-  {
-    final VectorValueMatcher[] matchers = new 
VectorValueMatcher[filters.size()];
-
-    int i = 0;
-    for (Filter filter : filters) {
-      matchers[i++] = filter.makeVectorMatcher(factory);
-    }
-    return makeVectorMatcher(matchers);
-  }
-
-  @Override
-  public boolean canVectorizeMatcher(ColumnInspector inspector)
-  {
-    return filters.stream().allMatch(filter -> 
filter.canVectorizeMatcher(inspector));
-  }
-
-  @Override
-  public LinkedHashSet<Filter> getFilters()
-  {
-    return filters;
-  }
-
-  @Override
-  public boolean supportsRequiredColumnRewrite()
-  {
-    for (Filter filter : filters) {
-      if (!filter.supportsRequiredColumnRewrite()) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
-  {
-    final List<Filter> newFilters = new ArrayList<>(filters.size());
-    for (Filter filter : filters) {
-      newFilters.add(filter.rewriteRequiredColumns(columnRewrites));
-    }
-    return new OrFilter(newFilters);
-  }
-
-  @Override
-  public String toString()
-  {
-    return StringUtils.format("(%s)", OR_JOINER.join(filters));
-  }
-
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    OrFilter orFilter = (OrFilter) o;
-    return Objects.equals(getFilters(), orFilter.getFilters());
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return Objects.hash(getFilters());
-  }
-
-
-  private static ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers)
-  {
-    Preconditions.checkState(baseMatchers.length > 0);
-
-    if (baseMatchers.length == 1) {
-      return baseMatchers[0];
-    }
-
-    return new ValueMatcher()
-    {
-      @Override
-      public boolean matches(boolean includeUnknown)
-      {
-        for (ValueMatcher matcher : baseMatchers) {
-          if (matcher.matches(includeUnknown)) {
-            return true;
-          }
-        }
-        return false;
+        return AndFilter.makeMatcher(
+            new ValueMatcher[]{
+                convertIndexToValueMatcher(baseOffset.getBaseReadableOffset(), 
bundleIndex, descending),
+                bundle.getMatcherBundle().valueMatcher(selectorFactory, 
baseOffset, descending)
+            }
+        );
       }
 
       @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      public VectorValueMatcher vectorMatcher(
+          VectorColumnSelectorFactory selectorFactory,
+          ReadableVectorOffset baseOffset
+      )
       {
-        inspector.visit("firstBaseMatcher", baseMatchers[0]);
-        inspector.visit("secondBaseMatcher", baseMatchers[1]);
-        // Don't inspect the 3rd and all consequent baseMatchers, cut runtime 
shape combinations at this point.
-        // Anyway if the filter is so complex, Hotspot won't inline all calls 
because of the inline limit.
+        return AndFilter.makeVectorMatcher(
+            new VectorValueMatcher[]{
+                convertIndexToVectorValueMatcher(
+                    baseOffset,
+                    bundleIndex
+                ),
+                bundle.getMatcherBundle().vectorMatcher(selectorFactory, 
baseOffset)
+            }
+        );
       }
-    };
-  }
-
-  private static VectorValueMatcher makeVectorMatcher(final 
VectorValueMatcher[] baseMatchers)
-  {
-    Preconditions.checkState(baseMatchers.length > 0);
-    if (baseMatchers.length == 1) {
-      return baseMatchers[0];
-    }
-
-    return new BaseVectorValueMatcher(baseMatchers[0])
-    {
-      final VectorMatch currentMask = VectorMatch.wrap(new 
int[getMaxVectorSize()]);
-      final VectorMatch scratch = VectorMatch.wrap(new 
int[getMaxVectorSize()]);
-      final VectorMatch retVal = VectorMatch.wrap(new int[getMaxVectorSize()]);
 
       @Override
-      public ReadableVectorMatch match(final ReadableVectorMatch mask, boolean 
includeUnknown)
+      public boolean canVectorize()
       {
-        ReadableVectorMatch currentMatch = baseMatchers[0].match(mask, 
includeUnknown);
-
-        // Initialize currentMask = mask, then progressively remove rows from 
the mask as we find matches for them.
-        // This isn't necessary for correctness (we could use the original 
"mask" on every call to "match") but it
-        // allows for short-circuiting on a row-by-row basis.
-        currentMask.copyFrom(mask);
-
-        // Initialize retVal = currentMatch, the rows matched by the first 
matcher. We'll add more as we loop over
-        // the rest of the matchers.
-        retVal.copyFrom(currentMatch);
-
-        for (int i = 1; i < baseMatchers.length; i++) {
-          if (retVal.isAllTrue(getCurrentVectorSize())) {
-            // Short-circuit if the entire vector is true.
-            break;
-          }
-
-          currentMask.removeAll(currentMatch);
-          currentMatch = baseMatchers[i].match(currentMask, false);
-          retVal.addAll(currentMatch, scratch);
-
-          if (currentMatch == currentMask) {
-            // baseMatchers[i] matched every remaining row. Short-circuit out.
-            break;
-          }
-        }
-
-        assert retVal.isValid(mask);
-        return retVal;
+        return bundle.getMatcherBundle() == null || 
bundle.getMatcherBundle().canVectorize();
       }
     };
   }
 
   /**
-   * Convert a {@link FilterBundle} that has both {@link 
FilterBundle#getIndex()} and
-   * {@link FilterBundle#getMatcherBundle()} into a 'matcher only' bundle by 
converting the index into a matcher
-   * with {@link #convertIndexToValueMatcher(ReadableOffset, ImmutableBitmap, 
boolean)} and
-   * {@link #convertIndexToVectorValueMatcher(ReadableVectorOffset, 
ImmutableBitmap)} and then doing a logical AND
-   * with the bundles matchers.
+   * Convert an index into a matcher bundle, using
+   * {@link #convertIndexToValueMatcher(ReadableOffset, ImmutableBitmap, 
boolean)} and
+   * {@link #convertIndexToVectorValueMatcher(ReadableVectorOffset, 
ImmutableBitmap)}
    */
-  private static FilterBundle.MatcherBundle convertBundleToMatcherOnlyBundle(
-      FilterBundle bundle,
-      ImmutableBitmap bundleIndex
+  private static FilterBundle.MatcherBundle convertIndexToMatcherBundle(
+      int selectionRowCount,
+      List<FilterBundle.IndexBundle> indexOnlyBundles,
+      List<FilterBundle.IndexBundleInfo> indexOnlyBundlesInfo,
+      long totalBitmapConstructTimeNs,
+      ImmutableBitmap partialIndex
   )
   {
     return new FilterBundle.MatcherBundle()
@@ -500,10 +241,22 @@ public class OrFilter implements BooleanFilter
       @Override
       public FilterBundle.MatcherBundleInfo getMatcherInfo()
       {
+        if (indexOnlyBundles.size() == 1) {
+          return new FilterBundle.MatcherBundleInfo(
+              indexOnlyBundles.get(0).getIndexInfo()::getFilter,
+              indexOnlyBundles.get(0).getIndexInfo(),
+              null
+          );
+        }
         return new FilterBundle.MatcherBundleInfo(
-            () -> "AND",
-            bundle.getIndex().getIndexInfo(),
-            
Collections.singletonList(bundle.getMatcherBundle().getMatcherInfo())
+            () -> "OR",
+            new FilterBundle.IndexBundleInfo(
+                () -> "OR",
+                selectionRowCount,
+                totalBitmapConstructTimeNs,
+                indexOnlyBundlesInfo
+            ),
+            null
         );
       }
 
@@ -514,12 +267,7 @@ public class OrFilter implements BooleanFilter
           boolean descending
       )
       {
-        return AndFilter.makeMatcher(
-            new ValueMatcher[]{
-                convertIndexToValueMatcher(baseOffset.getBaseReadableOffset(), 
bundleIndex, descending),
-                bundle.getMatcherBundle().valueMatcher(selectorFactory, 
baseOffset, descending)
-            }
-        );
+        return convertIndexToValueMatcher(baseOffset.getBaseReadableOffset(), 
partialIndex, descending);
       }
 
       @Override
@@ -528,220 +276,479 @@ public class OrFilter implements BooleanFilter
           ReadableVectorOffset baseOffset
       )
       {
-        return AndFilter.makeVectorMatcher(
-            new VectorValueMatcher[]{
-                convertIndexToVectorValueMatcher(
-                    baseOffset,
-                    bundleIndex
-                ),
-                bundle.getMatcherBundle().vectorMatcher(selectorFactory, 
baseOffset)
-            }
-        );
+        return convertIndexToVectorValueMatcher(baseOffset, partialIndex);
       }
 
       @Override
       public boolean canVectorize()
       {
-        return bundle.getMatcherBundle() == null || 
bundle.getMatcherBundle().canVectorize();
+        return true;
+      }
+    };
+  }
+
+  private static ValueMatcher convertIndexToValueMatcher(
+      final ReadableOffset offset,
+      final ImmutableBitmap rowBitmap,
+      boolean descending
+  )
+  {
+
+    if (descending) {
+
+      final IntIterator iter = 
BitmapOffset.getReverseBitmapOffsetIterator(rowBitmap);
+
+      if (!iter.hasNext()) {
+        return ValueMatchers.allFalse();
+      }
+      return new ValueMatcher()
+      {
+        int iterOffset = Integer.MAX_VALUE;
+
+        @Override
+        public boolean matches(boolean includeUnknown)
+        {
+          int currentOffset = offset.getOffset();
+          while (iterOffset > currentOffset && iter.hasNext()) {
+            iterOffset = iter.next();
+          }
+
+          return iterOffset == currentOffset;
+        }
+
+        @Override
+        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+        {
+          inspector.visit("offset", offset);
+          inspector.visit("iter", iter);
+        }
+      };
+    } else {
+      final PeekableIntIterator peekableIterator = 
rowBitmap.peekableIterator();
+
+      if (!peekableIterator.hasNext()) {
+        return ValueMatchers.allFalse();
+      }
+      return new ValueMatcher()
+      {
+        int iterOffset = -1;
+
+        @Override
+        public boolean matches(boolean includeUnknown)
+        {
+          int currentOffset = offset.getOffset();
+          peekableIterator.advanceIfNeeded(currentOffset);
+          if (peekableIterator.hasNext()) {
+            iterOffset = peekableIterator.peekNext();
+          }
+
+          return iterOffset == currentOffset;
+        }
+
+        @Override
+        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+        {
+          inspector.visit("offset", offset);
+          inspector.visit("peekableIterator", peekableIterator);
+        }
+      };
+    }
+  }
+
+  private static VectorValueMatcher convertIndexToVectorValueMatcher(
+      final ReadableVectorOffset vectorOffset,
+      final ImmutableBitmap bitmap
+  )
+  {
+    final PeekableIntIterator peekableIntIterator = bitmap.peekableIterator();
+    if (!peekableIntIterator.hasNext()) {
+      return BooleanVectorValueMatcher.of(vectorOffset, 
ConstantMatcherType.ALL_FALSE);
+    }
+
+    return new VectorValueMatcher()
+    {
+      final VectorMatch match = VectorMatch.wrap(new 
int[vectorOffset.getMaxVectorSize()]);
+      int iterOffset = -1;
+
+      @Override
+      public ReadableVectorMatch match(ReadableVectorMatch mask, boolean 
includeUnknown)
+      {
+        final int[] selection = match.getSelection();
+        if (vectorOffset.isContiguous()) {
+          int numRows = 0;
+          for (int i = 0; i < mask.getSelectionSize(); i++) {
+            final int maskNum = mask.getSelection()[i];
+            final int rowNum = vectorOffset.getStartOffset() + maskNum;
+            peekableIntIterator.advanceIfNeeded(rowNum);
+            if (peekableIntIterator.hasNext()) {
+              iterOffset = peekableIntIterator.peekNext();
+              if (iterOffset == rowNum) {
+                selection[numRows++] = maskNum;
+              }
+            }
+          }
+          match.setSelectionSize(numRows);
+          return match;
+        } else {
+          final int[] currentOffsets = vectorOffset.getOffsets();
+          int numRows = 0;
+          for (int i = 0; i < mask.getSelectionSize(); i++) {
+            final int maskNum = mask.getSelection()[i];
+            final int rowNum = currentOffsets[mask.getSelection()[i]];
+            peekableIntIterator.advanceIfNeeded(rowNum);
+            if (peekableIntIterator.hasNext()) {
+              iterOffset = peekableIntIterator.peekNext();
+              if (iterOffset == rowNum) {
+                selection[numRows++] = maskNum;
+              }
+            }
+          }
+          match.setSelectionSize(numRows);
+          return match;
+        }
+      }
+
+      @Override
+      public int getMaxVectorSize()
+      {
+        return vectorOffset.getMaxVectorSize();
+      }
+
+      @Override
+      public int getCurrentVectorSize()
+      {
+        return vectorOffset.getCurrentVectorSize();
+      }
+    };
+  }
+
+  @Override
+  public <T> FilterBundle makeFilterBundle(
+      FilterBundle.Builder filterBundleBuilder,
+      BitmapResultFactory<T> bitmapResultFactory,
+      int applyRowCount,
+      int totalRowCount,
+      boolean includeUnknown
+  )
+  {
+    // for OR filters, we have a few possible outcomes:
+    // 1 - all clauses are index only bundles. in this case we union the 
bitmaps together and make an index only bundle
+    // 2 - some clauses support indexes. in this case, we union the bitmaps of 
any index only bundles together to form a
+    //     partial index which is constructed into a matcher bundle with 
convertIndexToMatcherBundle. We translate any
+    //     index AND matcher bundles into a matcher only bundle with 
convertBundleToMatcherOnlyBundle. Finally, we
+    //     combine these with the remaining matcher only bundles to with 
makeMatcher/makeVectorMatcher to make a matcher
+    //     only bundle
+    // 3 - no clauses support indexes. in this case, we make a matcher only 
bundle using makeMatcher/makeVectorMatcher
+
+    final List<FilterBundle.IndexBundle> indexOnlyBundles = new ArrayList<>();
+    final List<FilterBundle.IndexBundleInfo> indexOnlyBundlesInfo = new 
ArrayList<>();
+    final List<FilterBundle.MatcherBundle> partialIndexBundles = new 
ArrayList<>();
+    final List<FilterBundle.MatcherBundle> matcherOnlyBundles = new 
ArrayList<>();
+
+    int indexUnionSize = 0;
+    ImmutableBitmap index = null;
+    ColumnIndexCapabilities merged = new SimpleColumnIndexCapabilities(true, 
true);
+    int emptyCount = 0;
+
+    final long bitmapConstructionStartNs = System.nanoTime();
+    for (FilterBundle.Builder subFilterBundleBuilder : 
filterBundleBuilder.getChildBuilders()) {
+      final FilterBundle bundle = subFilterBundleBuilder.build(
+          bitmapResultFactory,
+          Math.min(applyRowCount, totalRowCount - indexUnionSize),
+          totalRowCount,
+          includeUnknown
+      );
+      if (bundle.hasIndex()) {
+        final ImmutableBitmap bundleIndex = bundle.getIndex().getBitmap();
+        if (bundleIndex.isEmpty()) {
+          // we leave any indexes which are empty out of index, 
indexOnlyBundles, and partialIndexBundles
+          // even though we skip them, we still keep track of them to check 
for the case when we can build the OR into
+          // an index only bundle. We can count index and matcher bundles here 
too because the AND operation means that
+          // an empty index means the matcher can be skipped
+          emptyCount++;
+        } else {
+          if (bundle.hasMatcher()) {
+            // index and matcher bundles must be handled separately, they will 
need to be a single value matcher built
+            // by doing an AND operation between the index and the value 
matcher
+            // (a bundle is basically an AND operation between the index and 
matcher if the matcher is present)
+            partialIndexBundles.add(convertBundleToMatcherOnlyBundle(bundle, 
bundleIndex));
+          } else {
+            indexOnlyBundles.add(bundle.getIndex());
+            indexOnlyBundlesInfo.add(bundle.getIndex().getIndexInfo());
+            merged.merge(bundle.getIndex().getIndexCapabilities());
+            // union index only bitmaps together; if all sub-filters are 
'index only' bundles we will make an index only
+            // bundle ourselves, else we will use this index as a single value 
matcher
+            if (index == null) {
+              index = bundle.getIndex().getBitmap();
+            } else {
+              index = index.union(bundle.getIndex().getBitmap());
+            }
+            indexUnionSize = index.size();
+          }
+        }
+      } else {
+        matcherOnlyBundles.add(bundle.getMatcherBundle());
+      }
+    }
+    final long totalBitmapConstructTimeNs = System.nanoTime() - 
bitmapConstructionStartNs;
+
+
+    // if all the filters are 'index only', we can make an index only bundle
+    if (indexOnlyBundles.size() + emptyCount == filters.size()) {
+      if (index == null || index.isEmpty()) {
+        return FilterBundle.allFalse(
+            totalBitmapConstructTimeNs,
+            
filterBundleBuilder.getColumnIndexSelector().getBitmapFactory().makeEmptyImmutableBitmap()
+        );
+      }
+      if (indexOnlyBundles.size() == 1) {
+        return new FilterBundle(
+            indexOnlyBundles.get(0),
+            null
+        );
+      }
+      return new FilterBundle(
+          new FilterBundle.SimpleIndexBundle(
+              new FilterBundle.IndexBundleInfo(
+                  () -> "OR",
+                  applyRowCount,
+                  totalBitmapConstructTimeNs,
+                  indexOnlyBundlesInfo
+              ),
+              index,
+              merged
+          ),
+          null
+      );
+    }
+
+    // if not the index only outcome, we build a matcher only bundle from all 
the matchers
+    final int estimatedSize = (indexOnlyBundles.isEmpty() ? 0 : 1)
+                              + partialIndexBundles.size()
+                              + matcherOnlyBundles.size();
+    final List<FilterBundle.MatcherBundle> allMatcherBundles = 
Lists.newArrayListWithCapacity(estimatedSize);
+    final List<FilterBundle.MatcherBundleInfo> allMatcherBundlesInfo = 
Lists.newArrayListWithCapacity(estimatedSize);
+    if (!indexOnlyBundles.isEmpty()) {
+      // translate the indexOnly bundles into a single matcher
+      final FilterBundle.MatcherBundle matcherBundle = 
convertIndexToMatcherBundle(
+          applyRowCount,
+          indexOnlyBundles,
+          indexOnlyBundlesInfo,
+          totalBitmapConstructTimeNs,
+          index
+      );
+      allMatcherBundles.add(matcherBundle);
+      allMatcherBundlesInfo.add(matcherBundle.getMatcherInfo());
+    }
+    for (FilterBundle.MatcherBundle bundle : partialIndexBundles) {
+      allMatcherBundles.add(bundle);
+      allMatcherBundlesInfo.add(bundle.getMatcherInfo());
+    }
+    for (FilterBundle.MatcherBundle bundle : matcherOnlyBundles) {
+      allMatcherBundles.add(bundle);
+      allMatcherBundlesInfo.add(bundle.getMatcherInfo());
+    }
+
+    return new FilterBundle(
+        null,
+        new FilterBundle.MatcherBundle()
+        {
+          @Override
+          public FilterBundle.MatcherBundleInfo getMatcherInfo()
+          {
+            return new FilterBundle.MatcherBundleInfo(
+                () -> "OR",
+                null,
+                allMatcherBundlesInfo
+            );
+          }
+
+          @Override
+          public ValueMatcher valueMatcher(ColumnSelectorFactory 
selectorFactory, Offset baseOffset, boolean descending)
+          {
+            final ValueMatcher[] matchers = new 
ValueMatcher[allMatcherBundles.size()];
+            for (int i = 0; i < allMatcherBundles.size(); i++) {
+              matchers[i] = 
allMatcherBundles.get(i).valueMatcher(selectorFactory, baseOffset, descending);
+            }
+            return makeMatcher(matchers);
+          }
+
+          @Override
+          public VectorValueMatcher vectorMatcher(
+              VectorColumnSelectorFactory selectorFactory,
+              ReadableVectorOffset baseOffset
+          )
+          {
+            final VectorValueMatcher[] matchers = new 
VectorValueMatcher[allMatcherBundles.size()];
+            for (int i = 0; i < allMatcherBundles.size(); i++) {
+              matchers[i] = 
allMatcherBundles.get(i).vectorMatcher(selectorFactory, baseOffset);
+            }
+            return makeVectorMatcher(matchers);
+          }
+
+          @Override
+          public boolean canVectorize()
+          {
+            for (FilterBundle.MatcherBundle bundle : allMatcherBundles) {
+              if (!bundle.canVectorize()) {
+                return false;
+              }
+            }
+            return true;
+          }
+        }
+    );
+  }
+
+  @Nullable
+  @Override
+  public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
+  {
+    if (filters.size() == 1) {
+      return Iterables.getOnlyElement(filters).getBitmapColumnIndex(selector);
+    }
+
+    List<BitmapColumnIndex> bitmapColumnIndices = new 
ArrayList<>(filters.size());
+    ColumnIndexCapabilities merged = new SimpleColumnIndexCapabilities(true, 
true);
+    for (Filter filter : filters) {
+      BitmapColumnIndex index = filter.getBitmapColumnIndex(selector);
+      if (index == null) {
+        // all or nothing
+        return null;
       }
-    };
-  }
+      merged = merged.merge(index.getIndexCapabilities());
+      bitmapColumnIndices.add(index);
+    }
 
-  /**
-   * Convert an index into a matcher bundle, using
-   * {@link #convertIndexToValueMatcher(ReadableOffset, ImmutableBitmap, 
boolean)} and
-   * {@link #convertIndexToVectorValueMatcher(ReadableVectorOffset, 
ImmutableBitmap)}
-   */
-  private static FilterBundle.MatcherBundle convertIndexToMatcherBundle(
-      int selectionRowCount,
-      List<FilterBundle.IndexBundle> indexOnlyBundles,
-      List<FilterBundle.IndexBundleInfo> indexOnlyBundlesInfo,
-      long totalBitmapConstructTimeNs,
-      ImmutableBitmap partialIndex
-  )
-  {
-    return new FilterBundle.MatcherBundle()
+    final ColumnIndexCapabilities finalMerged = merged;
+    return new BitmapColumnIndex()
     {
       @Override
-      public FilterBundle.MatcherBundleInfo getMatcherInfo()
+      public ColumnIndexCapabilities getIndexCapabilities()
       {
-        if (indexOnlyBundles.size() == 1) {
-          return new FilterBundle.MatcherBundleInfo(
-              indexOnlyBundles.get(0).getIndexInfo()::getFilter,
-              indexOnlyBundles.get(0).getIndexInfo(),
-              null
-          );
-        }
-        return new FilterBundle.MatcherBundleInfo(
-            () -> "OR",
-            new FilterBundle.IndexBundleInfo(
-                () -> "OR",
-                selectionRowCount,
-                totalBitmapConstructTimeNs,
-                indexOnlyBundlesInfo
-            ),
-            null
-        );
+        return finalMerged;
       }
 
       @Override
-      public ValueMatcher valueMatcher(
-          ColumnSelectorFactory selectorFactory,
-          Offset baseOffset,
-          boolean descending
-      )
+      public int estimatedComputeCost()
       {
-        return convertIndexToValueMatcher(baseOffset.getBaseReadableOffset(), 
partialIndex, descending);
+        // There's no additional cost on OR filter, cost in child filters 
would be summed.
+        return 0;
       }
 
       @Override
-      public VectorValueMatcher vectorMatcher(
-          VectorColumnSelectorFactory selectorFactory,
-          ReadableVectorOffset baseOffset
-      )
+      public <T> T computeBitmapResult(BitmapResultFactory<T> 
bitmapResultFactory, boolean includeUnknown)
       {
-        return convertIndexToVectorValueMatcher(baseOffset, partialIndex);
+        return bitmapResultFactory.union(
+            () -> bitmapColumnIndices.stream()
+                                     .map(x -> 
x.computeBitmapResult(bitmapResultFactory, includeUnknown))
+                                     .iterator()
+        );
       }
 
+      @Nullable
       @Override
-      public boolean canVectorize()
+      public <T> T computeBitmapResult(
+          BitmapResultFactory<T> bitmapResultFactory,
+          int applyRowCount,
+          int totalRowCount,
+          boolean includeUnknown
+      )
       {
-        return true;
+        List<T> results = 
Lists.newArrayListWithCapacity(bitmapColumnIndices.size());
+        for (BitmapColumnIndex index : bitmapColumnIndices) {
+          final T r = index.computeBitmapResult(bitmapResultFactory, 
applyRowCount, totalRowCount, includeUnknown);
+          if (r == null) {
+            // all or nothing
+            return null;
+          }
+          results.add(r);
+        }
+        return bitmapResultFactory.union(results);
       }
     };
   }
 
-  private static ValueMatcher convertIndexToValueMatcher(
-      final ReadableOffset offset,
-      final ImmutableBitmap rowBitmap,
-      boolean descending
-  )
+  @Override
+  public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
   {
+    final ValueMatcher[] matchers = new ValueMatcher[filters.size()];
 
-    if (descending) {
-
-      final IntIterator iter = 
BitmapOffset.getReverseBitmapOffsetIterator(rowBitmap);
+    int i = 0;
+    for (Filter filter : filters) {
+      matchers[i++] = filter.makeMatcher(factory);
+    }
+    return makeMatcher(matchers);
+  }
 
-      if (!iter.hasNext()) {
-        return ValueMatchers.allFalse();
-      }
-      return new ValueMatcher()
-      {
-        int iterOffset = Integer.MAX_VALUE;
+  @Override
+  public VectorValueMatcher makeVectorMatcher(final 
VectorColumnSelectorFactory factory)
+  {
+    final VectorValueMatcher[] matchers = new 
VectorValueMatcher[filters.size()];
 
-        @Override
-        public boolean matches(boolean includeUnknown)
-        {
-          int currentOffset = offset.getOffset();
-          while (iterOffset > currentOffset && iter.hasNext()) {
-            iterOffset = iter.next();
-          }
+    int i = 0;
+    for (Filter filter : filters) {
+      matchers[i++] = filter.makeVectorMatcher(factory);
+    }
+    return makeVectorMatcher(matchers);
+  }
 
-          return iterOffset == currentOffset;
-        }
+  @Override
+  public boolean canVectorizeMatcher(ColumnInspector inspector)
+  {
+    return filters.stream().allMatch(filter -> 
filter.canVectorizeMatcher(inspector));
+  }
 
-        @Override
-        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-        {
-          inspector.visit("offset", offset);
-          inspector.visit("iter", iter);
-        }
-      };
-    } else {
-      final PeekableIntIterator peekableIterator = 
rowBitmap.peekableIterator();
+  @Override
+  public LinkedHashSet<Filter> getFilters()
+  {
+    return filters;
+  }
 
-      if (!peekableIterator.hasNext()) {
-        return ValueMatchers.allFalse();
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    for (Filter filter : filters) {
+      if (!filter.supportsRequiredColumnRewrite()) {
+        return false;
       }
-      return new ValueMatcher()
-      {
-        int iterOffset = -1;
-
-        @Override
-        public boolean matches(boolean includeUnknown)
-        {
-          int currentOffset = offset.getOffset();
-          peekableIterator.advanceIfNeeded(currentOffset);
-          if (peekableIterator.hasNext()) {
-            iterOffset = peekableIterator.peekNext();
-          }
-
-          return iterOffset == currentOffset;
-        }
-
-        @Override
-        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-        {
-          inspector.visit("offset", offset);
-          inspector.visit("peekableIterator", peekableIterator);
-        }
-      };
     }
+
+    return true;
   }
 
-  private static VectorValueMatcher convertIndexToVectorValueMatcher(
-      final ReadableVectorOffset vectorOffset,
-      final ImmutableBitmap bitmap
-  )
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
   {
-    final PeekableIntIterator peekableIntIterator = bitmap.peekableIterator();
-    if (!peekableIntIterator.hasNext()) {
-      return BooleanVectorValueMatcher.of(vectorOffset, 
ConstantMatcherType.ALL_FALSE);
+    final List<Filter> newFilters = new ArrayList<>(filters.size());
+    for (Filter filter : filters) {
+      newFilters.add(filter.rewriteRequiredColumns(columnRewrites));
     }
+    return new OrFilter(newFilters);
+  }
 
-    return new VectorValueMatcher()
-    {
-      final VectorMatch match = VectorMatch.wrap(new 
int[vectorOffset.getMaxVectorSize()]);
-      int iterOffset = -1;
-      @Override
-      public ReadableVectorMatch match(ReadableVectorMatch mask, boolean 
includeUnknown)
-      {
-        final int[] selection = match.getSelection();
-        if (vectorOffset.isContiguous()) {
-          int numRows = 0;
-          for (int i = 0; i < mask.getSelectionSize(); i++) {
-            final int maskNum = mask.getSelection()[i];
-            final int rowNum = vectorOffset.getStartOffset() + maskNum;
-            peekableIntIterator.advanceIfNeeded(rowNum);
-            if (peekableIntIterator.hasNext()) {
-              iterOffset = peekableIntIterator.peekNext();
-              if (iterOffset == rowNum) {
-                selection[numRows++] = maskNum;
-              }
-            }
-          }
-          match.setSelectionSize(numRows);
-          return match;
-        } else {
-          final int[] currentOffsets = vectorOffset.getOffsets();
-          int numRows = 0;
-          for (int i = 0; i < mask.getSelectionSize(); i++) {
-            final int maskNum = mask.getSelection()[i];
-            final int rowNum = currentOffsets[mask.getSelection()[i]];
-            peekableIntIterator.advanceIfNeeded(rowNum);
-            if (peekableIntIterator.hasNext()) {
-              iterOffset = peekableIntIterator.peekNext();
-              if (iterOffset == rowNum) {
-                selection[numRows++] = maskNum;
-              }
-            }
-          }
-          match.setSelectionSize(numRows);
-          return match;
-        }
-      }
+  @Override
+  public String toString()
+  {
+    return StringUtils.format("(%s)", OR_JOINER.join(filters));
+  }
 
-      @Override
-      public int getMaxVectorSize()
-      {
-        return vectorOffset.getMaxVectorSize();
-      }
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OrFilter orFilter = (OrFilter) o;
+    return Objects.equals(getFilters(), orFilter.getFilters());
+  }
 
-      @Override
-      public int getCurrentVectorSize()
-      {
-        return vectorOffset.getCurrentVectorSize();
-      }
-    };
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(getFilters());
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/index/BitmapColumnIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/index/BitmapColumnIndex.java
index 04a5bb8b6b5..f28429969d3 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/index/BitmapColumnIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/index/BitmapColumnIndex.java
@@ -35,6 +35,14 @@ public interface BitmapColumnIndex
 {
   ColumnIndexCapabilities getIndexCapabilities();
 
+  /**
+   * Returns an estimated cost for computing the bitmap result.
+   */
+  default int estimatedComputeCost()
+  {
+    return Integer.MAX_VALUE;
+  }
+
   /**
    * Compute a bitmap result wrapped with the {@link BitmapResultFactory} 
representing the rows matched by this index.
    * If building a cursor, use {@link 
#computeBitmapResult(BitmapResultFactory, int, int, boolean)} instead.
@@ -45,7 +53,6 @@ public interface BitmapColumnIndex
    *                            to true, bitmaps returned by this method 
should include true bits for any rows where
    *                            the matching result is 'unknown', such as from 
the input being null valued.
    *                            See {@link NullHandling#useThreeValueLogic()}.
-   *
    * @return bitmap result representing rows matched by this index
    */
   <T> T computeBitmapResult(
@@ -69,7 +76,6 @@ public interface BitmapColumnIndex
    *                            set to true, bitmaps returned by this method 
should include true bits for any rows where
    *                            the matching result is 'unknown', such as from 
the input being null valued.
    *                            See {@link NullHandling#useThreeValueLogic()}.
-   *
    * @return bitmap result representing rows matched by this index
    */
   @Nullable
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/FilterBundleTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/FilterBundleTest.java
index aa764535444..9bc86ae3900 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/FilterBundleTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/FilterBundleTest.java
@@ -43,7 +43,12 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 public class FilterBundleTest extends InitializedNullHandlingTest
 {
   private Closer closer;
@@ -53,6 +58,15 @@ public class FilterBundleTest extends 
InitializedNullHandlingTest
   @Rule
   public TemporaryFolder tmpDir = new TemporaryFolder();
 
+  @Parameters
+  public static Object[] flags()
+  {
+    return new Object[]{false, true};
+  }
+
+  @Parameter
+  public boolean cursorAutoArrangeFilters;
+
   @Before
   public void setUp()
   {
@@ -317,8 +331,7 @@ public class FilterBundleTest extends 
InitializedNullHandlingTest
 
   protected FilterBundle makeFilterBundle(final Filter filter)
   {
-    return filter.makeFilterBundle(
-        indexSelector,
+    return new FilterBundle.Builder(filter, indexSelector, 
cursorAutoArrangeFilters).build(
         new DefaultBitmapResultFactory(bitmapFactory),
         indexSelector.getNumRows(),
         indexSelector.getNumRows(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to