This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new cd6bbd53ee4 introduce SegmentPruner interface to abstract segment 
pruning operations (#19042)
cd6bbd53ee4 is described below

commit cd6bbd53ee42388ee86678e44b11bafc002687e1
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Feb 24 09:14:39 2026 -0800

    introduce SegmentPruner interface to abstract segment pruning operations 
(#19042)
    
    changes:
    * add new `SegmentPruner` interface as a replacement for 
`DimFilterUtils.filterShards` static method
    * `SegmentPruner` interface has a single 'prune' method that takes an 
`Iterable<T>` and `Function` to extract a `DataSegment` from the iterable 
objects, returning a `Collection<T>` of the filtered objects
    * `DimFilterUtils.filterShards` has been removed, its logic has been 
migrated to `FilterSegmentPruner` implementation of the new `SegmentPruner` 
interface
    * `TableInputSpec` now has a `SegmentPruner` field instead of separate 
`filter` and `filterFields` (these fields still remain for compat purposes)
    * transition `DartTableInputSpecSlicer`, `IndexerTableInputSpecSlicer, and 
`CachingClusteredClient` to use `SegmentPruner` (the former 2 via the 
`TableInputSpec`)
---
 docs/api-reference/sql-api.md                      |   9 -
 .../dart/controller/DartTableInputSpecSlicer.java  |  30 +-
 .../org/apache/druid/msq/exec/StageProcessor.java  |  12 +
 .../msq/indexing/IndexerTableInputSpecSlicer.java  |  26 +-
 .../org/apache/druid/msq/input/InputSlice.java     |   2 +-
 .../apache/druid/msq/input/InputSpecSlicer.java    |  15 +-
 .../apache/druid/msq/input/MapInputSpecSlicer.java |   9 +-
 .../input/external/ExternalInputSpecSlicer.java    |   9 +-
 .../msq/input/inline/InlineInputSpecSlicer.java    |   5 +-
 .../msq/input/lookup/LookupInputSpecSlicer.java    |   5 +-
 .../msq/input/stage/StageInputSpecSlicer.java      |   5 +-
 .../druid/msq/input/table/TableInputSpec.java      |  59 +---
 .../apache/druid/msq/kernel/StageDefinition.java   |  11 +
 .../druid/msq/kernel/WorkerAssignmentStrategy.java |  10 +-
 .../druid/msq/kernel/controller/WorkerInputs.java  |   5 +-
 .../apache/druid/msq/logical/stages/ReadStage.java |   2 +-
 .../druid/msq/querykit/BaseLeafStageProcessor.java |  17 +-
 .../apache/druid/msq/querykit/DataSourcePlan.java  |  53 +---
 .../druid/msq/querykit/WindowOperatorQueryKit.java |   2 -
 .../msq/querykit/groupby/GroupByQueryKit.java      |   2 -
 .../druid/msq/querykit/scan/ScanQueryKit.java      |   2 -
 .../controller/DartTableInputSpecSlicerTest.java   |  35 +--
 .../org/apache/druid/msq/input/InputSpecsTest.java |  12 +-
 .../external/ExternalInputSpecSlicerTest.java      |  28 +-
 .../msq/input/stage/StageInputSpecSlicerTest.java  |   8 +-
 .../table/IndexerTableInputSpecSlicerTest.java     |  81 +++---
 .../druid/msq/input/table/TableInputSpecTest.java  |  43 +--
 .../controller/ControllerTestInputSpecSlicer.java  |   5 +-
 .../msq/kernel/controller/WorkerInputsTest.java    |   9 +-
 .../msq1.iq                                        |   9 +-
 .../msqJoinHint.iq                                 |  27 +-
 .../msqNestedJoinHint.iq                           |   9 +-
 .../apache/druid/query/filter/DimFilterUtils.java  |  95 -------
 .../druid/query/filter/FilterSegmentPruner.java    | 136 +++++++++
 .../apache/druid/query/filter/SegmentPruner.java   |  24 +-
 .../druid/query/planning/ExecutionVertex.java      | 304 +++++++++++----------
 .../druid/query/filter/DimFilterUtilsTest.java     | 126 ---------
 .../query/filter/FilterSegmentPrunerTest.java      | 126 +++++++++
 .../qaWin/sql_explain.msq.iq                       |  36 +--
 .../druid/client/CachingClusteredClient.java       |  30 +-
 web-console/src/druid-models/stages/stages.mock.ts |   7 -
 web-console/src/druid-models/stages/stages.ts      |   2 -
 .../execution-stages-pane.tsx                      |   2 -
 43 files changed, 675 insertions(+), 769 deletions(-)

diff --git a/docs/api-reference/sql-api.md b/docs/api-reference/sql-api.md
index af60cee4c80..37b79e83c3b 100644
--- a/docs/api-reference/sql-api.md
+++ b/docs/api-reference/sql-api.md
@@ -911,15 +911,6 @@ Host: http://ROUTER_IP:ROUTER_PORT
                         "dataSource": "wikipedia",
                         "intervals": [
                             
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
-                        ],
-                        "filter": {
-                            "type": "equals",
-                            "column": "user",
-                            "matchValueType": "STRING",
-                            "matchValue": "BlueMoon2662"
-                        },
-                        "filterFields": [
-                            "user"
                         ]
                     }
                 ],
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
index 4b7ea2ed8ee..a9b4bc6b070 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
@@ -20,6 +20,7 @@
 package org.apache.druid.msq.dart.controller;
 
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import org.apache.druid.client.QueryableDruidServer;
@@ -44,13 +45,15 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.CloneQueryMode;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.filter.DimFilterUtils;
+import org.apache.druid.query.filter.SegmentPruner;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineLookup;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -122,7 +125,11 @@ public class DartTableInputSpecSlicer implements 
InputSpecSlicer
   }
 
   @Override
-  public List<InputSlice> sliceStatic(final InputSpec inputSpec, final int 
maxNumSlices)
+  public List<InputSlice> sliceStatic(
+      final InputSpec inputSpec,
+      @Nullable final SegmentPruner segmentPruner,
+      final int maxNumSlices
+  )
   {
     final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
     final TimelineLookup<String, ServerSelector> timeline =
@@ -132,9 +139,10 @@ public class DartTableInputSpecSlicer implements 
InputSpecSlicer
       return Collections.emptyList();
     }
 
-    final Set<DartQueryableSegment> prunedSegments =
+    final Collection<DartQueryableSegment> prunedSegments =
         findQueryableDataSegments(
             tableInputSpec,
+            segmentPruner,
             timeline,
             serverSelector -> findWorkerForServerSelector(serverSelector, 
maxNumSlices)
         );
@@ -183,6 +191,7 @@ public class DartTableInputSpecSlicer implements 
InputSpecSlicer
   @Override
   public List<InputSlice> sliceDynamic(
       final InputSpec inputSpec,
+      @Nullable final SegmentPruner segmentPruner,
       final int maxNumSlices,
       final int maxFilesPerSlice,
       final long maxBytesPerSlice
@@ -222,8 +231,9 @@ public class DartTableInputSpecSlicer implements 
InputSpecSlicer
    * Pull the list of {@link DataSegment} that we should query, along with a 
clipping interval for each one, and
    * a worker to get it from.
    */
-  private Set<DartQueryableSegment> findQueryableDataSegments(
+  private Collection<DartQueryableSegment> findQueryableDataSegments(
       final TableInputSpec tableInputSpec,
+      @Nullable final SegmentPruner segmentPruner,
       final TimelineLookup<?, ServerSelector> timeline,
       final ToIntFunction<ServerSelector> toWorkersFunction
   )
@@ -242,14 +252,10 @@ public class DartTableInputSpecSlicer implements 
InputSpecSlicer
                                   })
                                   .filter(segment -> 
!segment.getSegment().isTombstone())
                       );
-
-    return DimFilterUtils.filterShards(
-        tableInputSpec.getFilter(),
-        tableInputSpec.getFilterFields(),
-        allSegments,
-        segment -> segment.getSegment().getShardSpec(),
-        new HashMap<>()
-    );
+    if (segmentPruner == null) {
+      return ImmutableSet.copyOf(allSegments);
+    }
+    return segmentPruner.prune(allSegments, DartQueryableSegment::getSegment);
   }
 
   private DartQueryableSegment toDartQueryableSegment(
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/StageProcessor.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/StageProcessor.java
index 0b6ff41f12c..da47511e848 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/StageProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/StageProcessor.java
@@ -24,7 +24,9 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.frame.processor.OutputChannelFactory;
 import org.apache.druid.msq.indexing.processor.SegmentGeneratorStageProcessor;
+import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.query.filter.SegmentPruner;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
@@ -77,4 +79,14 @@ public interface StageProcessor<R, ExtraInfoType>
    */
   @SuppressWarnings("rawtypes")
   ExtraInfoHolder makeExtraInfoHolder(@Nullable ExtraInfoType extra);
+
+  /**
+   * Produces an optional {@link SegmentPruner}, which can used for 
best-effort pruning of {@link DataSegment} objects
+   * by processors which deal with them
+   */
+  @Nullable
+  default SegmentPruner getPruner(InputSpec inputSpec, int inputNumber)
+  {
+    return null;
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java
index a7b8cb6b0a8..ddbfdde9854 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterators;
 import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.client.coordinator.CoordinatorClient;
@@ -44,7 +45,7 @@ import org.apache.druid.msq.input.table.RichSegmentDescriptor;
 import org.apache.druid.msq.input.table.SegmentsInputSlice;
 import org.apache.druid.msq.input.table.TableInputSpec;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.filter.DimFilterUtils;
+import org.apache.druid.query.filter.SegmentPruner;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
@@ -96,14 +97,14 @@ public class IndexerTableInputSpecSlicer implements 
InputSpecSlicer
   }
 
   @Override
-  public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+  public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable 
SegmentPruner segmentPruner, int maxNumSlices)
   {
     final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
 
     final List<WeightedInputInstance> prunedPublishedSegments = new 
ArrayList<>();
     final List<DataSegmentWithInterval> prunedServedSegments = new 
ArrayList<>();
 
-    for (DataSegmentWithInterval dataSegmentWithInterval : 
getPrunedSegmentSet(tableInputSpec)) {
+    for (DataSegmentWithInterval dataSegmentWithInterval : 
getPrunedSegmentSet(tableInputSpec, segmentPruner)) {
       if (dataSegmentWithInterval.segment instanceof DataSegmentWithLocation) {
         prunedServedSegments.add(dataSegmentWithInterval);
       } else {
@@ -125,6 +126,7 @@ public class IndexerTableInputSpecSlicer implements 
InputSpecSlicer
   @Override
   public List<InputSlice> sliceDynamic(
       InputSpec inputSpec,
+      @Nullable SegmentPruner segmentPruner,
       int maxNumSlices,
       int maxFilesPerSlice,
       long maxBytesPerSlice
@@ -135,7 +137,7 @@ public class IndexerTableInputSpecSlicer implements 
InputSpecSlicer
     final List<WeightedInputInstance> prunedSegments = new ArrayList<>();
     final List<DataSegmentWithInterval> prunedServedSegments = new 
ArrayList<>();
 
-    for (DataSegmentWithInterval dataSegmentWithInterval : 
getPrunedSegmentSet(tableInputSpec)) {
+    for (DataSegmentWithInterval dataSegmentWithInterval : 
getPrunedSegmentSet(tableInputSpec, segmentPruner)) {
       if (dataSegmentWithInterval.segment instanceof DataSegmentWithLocation) {
         prunedServedSegments.add(dataSegmentWithInterval);
       } else {
@@ -157,7 +159,10 @@ public class IndexerTableInputSpecSlicer implements 
InputSpecSlicer
     return makeSlices(tableInputSpec, assignments);
   }
 
-  private Set<DataSegmentWithInterval> getPrunedSegmentSet(final 
TableInputSpec tableInputSpec)
+  private Collection<DataSegmentWithInterval> getPrunedSegmentSet(
+      final TableInputSpec tableInputSpec,
+      @Nullable final SegmentPruner segmentPruner
+  )
   {
     final TimelineLookup<String, DataSegment> timeline =
         getTimeline(tableInputSpec.getDataSource(), 
tableInputSpec.getIntervals());
@@ -182,13 +187,10 @@ public class IndexerTableInputSpecSlicer implements 
InputSpecSlicer
                                              .map(segment -> new 
DataSegmentWithInterval(segment, holder.getInterval()))
                         ).iterator();
 
-      return DimFilterUtils.filterShards(
-          tableInputSpec.getFilter(),
-          tableInputSpec.getFilterFields(),
-          () -> dataSegmentIterator,
-          segment -> segment.getSegment().getShardSpec(),
-          new HashMap<>()
-      );
+      if (segmentPruner == null) {
+        return ImmutableSet.copyOf(dataSegmentIterator);
+      }
+      return segmentPruner.prune(() -> dataSegmentIterator, 
DataSegmentWithInterval::getSegment);
     }
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
index 90a6970485e..ccd08bc1f9b 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
  * Slice of an {@link InputSpec} assigned to a particular worker.
  *
  * On the controller, these are produced using {@link InputSpecSlicer}. On 
workers, these are transformed into
- * {@link ReadableSlice} using {@link InputSliceReader}.
+ * {@link PhysicalInputSlice} using {@link InputSliceReader}.
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 public interface InputSlice
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicer.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicer.java
index ff1808e463c..4e29cd7eb61 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicer.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicer.java
@@ -19,6 +19,9 @@
 
 package org.apache.druid.msq.input;
 
+import org.apache.druid.query.filter.SegmentPruner;
+
+import javax.annotation.Nullable;
 import java.util.List;
 
 /**
@@ -28,7 +31,7 @@ import java.util.List;
 public interface InputSpecSlicer
 {
   /**
-   * Whether {@link #sliceDynamic(InputSpec, int, int, long)} is usable for a 
given {@link InputSpec}.
+   * Whether {@link #sliceDynamic(InputSpec, SegmentPruner, int, int, long)} 
is usable for a given {@link InputSpec}.
    */
   boolean canSliceDynamic(InputSpec inputSpec);
 
@@ -39,7 +42,7 @@ public interface InputSpecSlicer
    * This method creates as many slices as possible while staying at or under 
maxNumSlices. For example, if a spec
    * contains 8 files, and maxNumSlices is 10, then 8 slices will be created.
    */
-  List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices);
+  List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable SegmentPruner 
segmentPruner, int maxNumSlices);
 
   /**
    * Slice a spec based on a particular maximum number of files and bytes per 
slice.
@@ -58,5 +61,11 @@ public interface InputSpecSlicer
    *
    * @throws UnsupportedOperationException if {@link 
#canSliceDynamic(InputSpec)} returns false
    */
-  List<InputSlice> sliceDynamic(InputSpec inputSpec, int maxNumSlices, int 
maxFilesPerSlice, long maxBytesPerSlice);
+  List<InputSlice> sliceDynamic(
+      InputSpec inputSpec,
+      @Nullable SegmentPruner segmentPruner,
+      int maxNumSlices,
+      int maxFilesPerSlice,
+      long maxBytesPerSlice
+  );
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/MapInputSpecSlicer.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/MapInputSpecSlicer.java
index f26a0459c89..9677e0bad96 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/MapInputSpecSlicer.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/MapInputSpecSlicer.java
@@ -21,7 +21,9 @@ package org.apache.druid.msq.input;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.filter.SegmentPruner;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -44,20 +46,21 @@ public class MapInputSpecSlicer implements InputSpecSlicer
   }
 
   @Override
-  public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+  public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable 
SegmentPruner segmentPruner, int maxNumSlices)
   {
-    return getSlicer(inputSpec.getClass()).sliceStatic(inputSpec, 
maxNumSlices);
+    return getSlicer(inputSpec.getClass()).sliceStatic(inputSpec, 
segmentPruner, maxNumSlices);
   }
 
   @Override
   public List<InputSlice> sliceDynamic(
       InputSpec inputSpec,
+      @Nullable SegmentPruner segmentPruner,
       int maxNumSlices,
       int maxFilesPerSlice,
       long maxBytesPerSlice
   )
   {
-    return getSlicer(inputSpec.getClass()).sliceDynamic(inputSpec, 
maxNumSlices, maxFilesPerSlice, maxBytesPerSlice);
+    return getSlicer(inputSpec.getClass()).sliceDynamic(inputSpec, 
segmentPruner, maxNumSlices, maxFilesPerSlice, maxBytesPerSlice);
   }
 
   private InputSpecSlicer getSlicer(final Class<? extends InputSpec> clazz)
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
index 7c6700ce3a2..8db9d57324e 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
@@ -31,7 +31,9 @@ import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
 import org.apache.druid.msq.input.NilInputSlice;
 import org.apache.druid.msq.input.SlicerUtils;
+import org.apache.druid.query.filter.SegmentPruner;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -52,7 +54,7 @@ public class ExternalInputSpecSlicer implements 
InputSpecSlicer
   }
 
   @Override
-  public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+  public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable 
SegmentPruner segmentPruner, int maxNumSlices)
   {
     final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
 
@@ -70,6 +72,7 @@ public class ExternalInputSpecSlicer implements 
InputSpecSlicer
   @Override
   public List<InputSlice> sliceDynamic(
       final InputSpec inputSpec,
+      @Nullable final SegmentPruner segmentPruner,
       final int maxNumSlices,
       final int maxFilesPerSlice,
       final long maxBytesPerSlice
@@ -176,7 +179,7 @@ public class ExternalInputSpecSlicer implements 
InputSpecSlicer
   }
 
   /**
-   * Split hint spec used by {@link #sliceStatic(InputSpec, int)}.
+   * Split hint spec used by {@link #sliceStatic(InputSpec, SegmentPruner, 
int)}.
    */
   static class StaticSplitHintSpec implements SplitHintSpec
   {
@@ -205,7 +208,7 @@ public class ExternalInputSpecSlicer implements 
InputSpecSlicer
   }
 
   /**
-   * Split hint spec used by {@link #sliceDynamic(InputSpec, int, int, long)}.
+   * Split hint spec used by {@link #sliceDynamic(InputSpec, SegmentPruner, 
int, int, long)}.
    */
   static class DynamicSplitHintSpec implements SplitHintSpec
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSpecSlicer.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSpecSlicer.java
index d72482df4e9..7bc7f14205a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSpecSlicer.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSpecSlicer.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.input.inline;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
 
+import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
 
@@ -38,7 +40,7 @@ public class InlineInputSpecSlicer implements InputSpecSlicer
   }
 
   @Override
-  public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+  public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable 
SegmentPruner segmentPruner, int maxNumSlices)
   {
     return Collections.singletonList(new InlineInputSlice(((InlineInputSpec) 
inputSpec).getDataSource()));
   }
@@ -46,6 +48,7 @@ public class InlineInputSpecSlicer implements InputSpecSlicer
   @Override
   public List<InputSlice> sliceDynamic(
       InputSpec inputSpec,
+      @Nullable SegmentPruner segmentPruner,
       int maxNumSlices,
       int maxFilesPerSlice,
       long maxBytesPerSlice
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSpecSlicer.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSpecSlicer.java
index 27c4a7f2f24..8fe7770cc38 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSpecSlicer.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSpecSlicer.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.input.lookup;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
 
+import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
 
@@ -38,7 +40,7 @@ public class LookupInputSpecSlicer implements InputSpecSlicer
   }
 
   @Override
-  public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+  public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable 
SegmentPruner segmentPruner, int maxNumSlices)
   {
     return Collections.singletonList(new LookupInputSlice(((LookupInputSpec) 
inputSpec).getLookupName()));
   }
@@ -46,6 +48,7 @@ public class LookupInputSpecSlicer implements InputSpecSlicer
   @Override
   public List<InputSlice> sliceDynamic(
       InputSpec inputSpec,
+      @Nullable SegmentPruner segmentPruner,
       int maxNumSlices,
       int maxFilesPerSlice,
       long maxBytesPerSlice
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StageInputSpecSlicer.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StageInputSpecSlicer.java
index f3b5d23ae4f..74b092a12e5 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StageInputSpecSlicer.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StageInputSpecSlicer.java
@@ -25,7 +25,9 @@ import org.apache.druid.msq.exec.OutputChannelMode;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -56,7 +58,7 @@ public class StageInputSpecSlicer implements InputSpecSlicer
   }
 
   @Override
-  public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+  public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable 
SegmentPruner segmentPruner, int maxNumSlices)
   {
     final StageInputSpec stageInputSpec = (StageInputSpec) inputSpec;
 
@@ -91,6 +93,7 @@ public class StageInputSpecSlicer implements InputSpecSlicer
   @Override
   public List<InputSlice> sliceDynamic(
       InputSpec inputSpec,
+      @Nullable SegmentPruner segmentPruner,
       int maxNumSlices,
       int maxFilesPerSlice,
       long maxBytesPerSlice
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
index 697c65599a6..a7e88578110 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
@@ -29,13 +29,11 @@ import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.LoadableSegment;
 import org.apache.druid.msq.input.PhysicalInputSlice;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.filter.DimFilter;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 
 /**
  * Input spec representing a Druid table.
@@ -45,16 +43,9 @@ public class TableInputSpec implements InputSpec
 {
   private final String dataSource;
   private final List<Interval> intervals;
-
   @Nullable
   private final List<SegmentDescriptor> segments;
 
-  @Nullable
-  private final DimFilter filter;
-
-  @Nullable
-  private final Set<String> filterFields;
-
   /**
    * Create a table input spec.
    *
@@ -65,20 +56,12 @@ public class TableInputSpec implements InputSpec
    *                     {@link LoadableSegment#descriptor()}.
    * @param segments     specific segments to read, or null to read all 
segments in the intervals. If provided,
    *                     only these segments will be read. Must not be empty 
if non-null.
-   * @param filter       other filters to use for pruning, or null if no 
pruning is desired. Pruning filters are
-   *                     *not strict*, which means that processors must 
re-apply them when processing the returned
-   *                     {@link LoadableSegment} from {@link 
PhysicalInputSlice#getLoadableSegments()}. This matches how
-   *                     Broker-based pruning works for native queries.
-   * @param filterFields list of fields from {@link 
DimFilter#getRequiredColumns()} to consider for pruning. If null,
-   *                     all fields are considered for pruning.
    */
   @JsonCreator
   public TableInputSpec(
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("intervals") @Nullable List<Interval> intervals,
-      @JsonProperty("segments") @Nullable List<SegmentDescriptor> segments,
-      @JsonProperty("filter") @Nullable DimFilter filter,
-      @JsonProperty("filterFields") @Nullable Set<String> filterFields
+      @JsonProperty("segments") @Nullable List<SegmentDescriptor> segments
   )
   {
     this.dataSource = dataSource;
@@ -87,22 +70,6 @@ public class TableInputSpec implements InputSpec
       throw new IAE("Can not supply empty segments as input, please use either 
null or non-empty segments.");
     }
     this.segments = segments;
-    this.filter = filter;
-    this.filterFields = filterFields;
-  }
-
-  /**
-   * @deprecated Use {@link #TableInputSpec(String, List, List, DimFilter, 
Set)} with explicit null for segments instead.
-   */
-  @Deprecated
-  public TableInputSpec(
-      String dataSource,
-      @Nullable List<Interval> intervals,
-      @Nullable DimFilter filter,
-      @Nullable Set<String> filterFields
-  )
-  {
-    this(dataSource, intervals, null, filter, filterFields);
   }
 
   @JsonProperty
@@ -133,22 +100,6 @@ public class TableInputSpec implements InputSpec
     return segments;
   }
 
-  @JsonProperty
-  @JsonInclude(JsonInclude.Include.NON_NULL)
-  @Nullable
-  public DimFilter getFilter()
-  {
-    return filter;
-  }
-
-  @JsonProperty
-  @JsonInclude(JsonInclude.Include.NON_NULL)
-  @Nullable
-  public Set<String> getFilterFields()
-  {
-    return filterFields;
-  }
-
   @Override
   public boolean equals(Object o)
   {
@@ -161,15 +112,13 @@ public class TableInputSpec implements InputSpec
     TableInputSpec that = (TableInputSpec) o;
     return Objects.equals(dataSource, that.dataSource)
            && Objects.equals(intervals, that.intervals)
-           && Objects.equals(segments, that.segments)
-           && Objects.equals(filter, that.filter)
-           && Objects.equals(filterFields, that.filterFields);
+           && Objects.equals(segments, that.segments);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(dataSource, intervals, segments, filter, filterFields);
+    return Objects.hash(dataSource, intervals, segments);
   }
 
   @Override
@@ -179,8 +128,6 @@ public class TableInputSpec implements InputSpec
            "dataSource='" + dataSource + '\'' +
            ", intervals=" + intervals +
            (segments == null ? "" : ", segments=" + segments) +
-           (filter == null ? "" : ", filter=" + filter) +
-           (filterFields == null ? "" : ", filterFields=" + filterFields) +
            '}';
   }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 736195155c6..f836a357d3c 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -50,6 +50,7 @@ import 
org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.filter.SegmentPruner;
 import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
@@ -231,6 +232,16 @@ public class StageDefinition
     }
   }
 
+  /**
+   * Get a {@link SegmentPruner} from the {@link StageProcessor} for a given 
'input number' from {@link #inputSpecs}.
+   * This can be used to best-effort prune the set of {@link 
org.apache.druid.timeline.DataSegment} to process in order
+   * to reduce the working set before processing begins
+   */
+  public SegmentPruner getSegmentPruner(int inputNumber)
+  {
+    return processor.getPruner(inputSpecs.get(inputNumber), inputNumber);
+  }
+
   /**
    * Returns the {@link ShuffleSpec} for this stage, if {@link #doesShuffle()}.
    *
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
index 0c9115ec437..4f9847bad0c 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
@@ -27,7 +27,9 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.OptionalInt;
 
@@ -48,11 +50,12 @@ public enum WorkerAssignmentStrategy
         final InputSpec inputSpec,
         final Int2IntMap stageWorkerCountMap,
         final InputSpecSlicer slicer,
+        @Nullable final SegmentPruner segmentPruner,
         final int maxInputFilesPerSlice,
         final long maxInputBytesPerSlice
     )
     {
-      return slicer.sliceStatic(inputSpec, stageDef.getMaxWorkerCount());
+      return slicer.sliceStatic(inputSpec, segmentPruner, 
stageDef.getMaxWorkerCount());
     }
   },
 
@@ -69,6 +72,7 @@ public enum WorkerAssignmentStrategy
         final InputSpec inputSpec,
         final Int2IntMap stageWorkerCountMap,
         final InputSpecSlicer slicer,
+        @Nullable final SegmentPruner segmentPruner,
         final int maxInputFilesPerSlice,
         final long maxInputBytesPerSlice
     )
@@ -76,6 +80,7 @@ public enum WorkerAssignmentStrategy
       if (slicer.canSliceDynamic(inputSpec)) {
         return slicer.sliceDynamic(
             inputSpec,
+            segmentPruner,
             stageDef.getMaxWorkerCount(),
             maxInputFilesPerSlice,
             maxInputBytesPerSlice
@@ -92,7 +97,7 @@ public enum WorkerAssignmentStrategy
         final IntSet inputStages = stageDef.getInputStageNumbers();
         final OptionalInt maxInputStageWorkerCount = 
inputStages.intStream().map(stageWorkerCountMap).max();
         final int workerCount = Math.min(stageDef.getMaxWorkerCount(), 
maxInputStageWorkerCount.orElse(1));
-        return slicer.sliceStatic(inputSpec, workerCount);
+        return slicer.sliceStatic(inputSpec, segmentPruner, workerCount);
       }
     }
   };
@@ -127,6 +132,7 @@ public enum WorkerAssignmentStrategy
       InputSpec inputSpec,
       Int2IntMap stageWorkerCountMap,
       InputSpecSlicer slicer,
+      @Nullable SegmentPruner segmentPruner,
       int maxInputFilesPerSlice,
       long maxInputBytesPerSlice
   );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
index 6e1f00b1c84..ef825ed69d3 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
@@ -34,6 +34,7 @@ import org.apache.druid.msq.input.InputSpecSlicer;
 import org.apache.druid.msq.input.NilInputSlice;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.query.filter.SegmentPruner;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -79,10 +80,11 @@ public class WorkerInputs
     // Assign input slices to workers.
     for (int inputNumber = 0; inputNumber < numInputs; inputNumber++) {
       final InputSpec inputSpec = stageDef.getInputSpecs().get(inputNumber);
+      final SegmentPruner pruner = stageDef.getSegmentPruner(inputNumber);
 
       if (stageDef.getBroadcastInputNumbers().contains(inputNumber)) {
         // Broadcast case: send everything everywhere.
-        final List<InputSlice> broadcastSlices = slicer.sliceStatic(inputSpec, 
1);
+        final List<InputSlice> broadcastSlices = slicer.sliceStatic(inputSpec, 
pruner, 1);
         final InputSlice broadcastSlice = broadcastSlices.isEmpty()
                                           ? NilInputSlice.INSTANCE
                                           : 
Iterables.getOnlyElement(broadcastSlices);
@@ -100,6 +102,7 @@ public class WorkerInputs
             inputSpec,
             stageWorkerCountMap,
             slicer,
+            pruner,
             maxInputFilesPerWorker,
             maxInputBytesPerWorker
         );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
index a4eaf099dfe..5ff11bbf723 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
@@ -139,7 +139,7 @@ public class ReadStage extends AbstractFrameProcessorStage
   {
     if (dataSource instanceof TableDataSource) {
       TableDataSource ids = (TableDataSource) dataSource;
-      TableInputSpec inputSpec = new TableInputSpec(ids.getName(), 
Intervals.ONLY_ETERNITY, null, null, null);
+      TableInputSpec inputSpec = new TableInputSpec(ids.getName(), 
Intervals.ONLY_ETERNITY, null);
       return inputSpec;
     }
     if (dataSource instanceof InlineDataSource) {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
index 148eea2b174..357829cdd29 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.msq.querykit;
 
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -43,6 +45,7 @@ import org.apache.druid.msq.exec.std.ProcessorsAndChannels;
 import org.apache.druid.msq.exec.std.StandardPartitionReader;
 import org.apache.druid.msq.exec.std.StandardStageRunner;
 import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.PhysicalInputSlice;
 import org.apache.druid.msq.input.external.ExternalInputSlice;
 import org.apache.druid.msq.input.stage.StageInputSlice;
@@ -50,6 +53,7 @@ import org.apache.druid.msq.input.table.SegmentsInputSlice;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.filter.SegmentPruner;
 import org.apache.druid.query.planning.ExecutionVertex;
 import org.apache.druid.segment.SegmentMapFunction;
 import org.apache.druid.utils.CollectionUtils;
@@ -70,10 +74,12 @@ import java.util.function.Function;
 public abstract class BaseLeafStageProcessor extends BasicStageProcessor
 {
   private final Query<?> query;
+  private final Supplier<ExecutionVertex> executionVertexSupplier;
 
   protected BaseLeafStageProcessor(Query<?> query)
   {
     this.query = query;
+    this.executionVertexSupplier = Suppliers.memoize(() -> 
ExecutionVertex.of(query));
   }
 
   @Override
@@ -153,7 +159,7 @@ public abstract class BaseLeafStageProcessor extends 
BasicStageProcessor
     final ProcessorManager processorManager;
 
     if (segmentMapFnProcessor == null) {
-      final SegmentMapFunction segmentMapFn = 
ExecutionVertex.of(query).createSegmentMapFunction(frameContext.policyEnforcer());
+      final SegmentMapFunction segmentMapFn = 
executionVertexSupplier.get().createSegmentMapFunction(frameContext.policyEnforcer());
       processorManager = 
processorManagerFn.apply(ImmutableList.of(segmentMapFn));
     } else {
       processorManager = new ChainedProcessorManager<>(
@@ -171,6 +177,13 @@ public abstract class BaseLeafStageProcessor extends 
BasicStageProcessor
     );
   }
 
+  @Nullable
+  @Override
+  public SegmentPruner getPruner(InputSpec inputSpec, int inputNumber)
+  {
+    return executionVertexSupplier.get().getSegmentPruner();
+  }
+
   private ProcessorManager<Object, Long> 
createBaseLeafProcessorManagerWithHandoff(
       final ExecutionContext context,
       final ReadableInputQueue baseInputQueue,
@@ -337,7 +350,7 @@ public abstract class BaseLeafStageProcessor extends 
BasicStageProcessor
     final Int2ObjectMap<ReadableInput> broadcastInputs = 
readBroadcastInputsFromEarlierStages(context);
 
     if (broadcastInputs.isEmpty()) {
-      if (ExecutionVertex.of(query).isSegmentMapFunctionExpensive()) {
+      if (executionVertexSupplier.get().isSegmentMapFunctionExpensive()) {
         // Joins may require significant computation to compute the 
segmentMapFn. Offload it to a processor.
         return new SimpleSegmentMapFnProcessor(query, 
context.frameContext().policyEnforcer());
       } else {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index 51850d7046e..bdc4370f9e7 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -57,8 +57,6 @@ import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.UnionDataSource;
 import org.apache.druid.query.UnnestDataSource;
-import org.apache.druid.query.filter.DimFilter;
-import org.apache.druid.query.filter.DimFilterUtils;
 import org.apache.druid.query.planning.JoinDataSourceAnalysis;
 import org.apache.druid.query.planning.PreJoinableClause;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@@ -79,7 +77,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -137,9 +134,6 @@ public class DataSourcePlan
    * @param dataSource       datasource to plan
    * @param querySegmentSpec intervals for mandatory pruning. Must be {@link 
MultipleIntervalSegmentSpec}. The returned
    *                         plan is guaranteed to be filtered to this 
interval.
-   * @param filter           filter for best-effort pruning. The returned plan 
may or may not be filtered to this
-   *                         filter. Query processing must still apply the 
filter to generated correct results.
-   * @param filterFields     which fields from the filter to consider for 
pruning, or null to consider all fields.
    * @param minStageNumber   starting stage number for subqueries
    * @param broadcast        whether the plan should broadcast data for this 
datasource
    */
@@ -148,38 +142,21 @@ public class DataSourcePlan
       final QueryContext queryContext,
       final DataSource dataSource,
       final QuerySegmentSpec querySegmentSpec,
-      @Nullable DimFilter filter,
-      @Nullable Set<String> filterFields,
       final int minStageNumber,
       final boolean broadcast
   )
   {
-    if (!queryContext.isSecondaryPartitionPruningEnabled()) {
-      // Clear filter, we don't want to prune today.
-      filter = null;
-      filterFields = null;
-    }
-
-    if (filter != null && filterFields == null) {
-      // Ensure filterFields is nonnull if filter is nonnull. Helps for other 
forXYZ methods, so they don't need to
-      // deal with the case where filter is nonnull but filterFields is null.
-      filterFields = filter.getRequiredColumns();
-    }
 
     if (dataSource instanceof TableDataSource) {
       return forTable(
           (TableDataSource) dataSource,
           querySegmentSpec,
-          filter,
-          filterFields,
           broadcast
       );
     } else if (dataSource instanceof RestrictedDataSource) {
       return forRestricted(
           (RestrictedDataSource) dataSource,
           querySegmentSpec,
-          filter,
-          filterFields,
           broadcast
       );
     } else if (dataSource instanceof ExternalDataSource) {
@@ -222,8 +199,6 @@ public class DataSourcePlan
           queryContext,
           (UnionDataSource) dataSource,
           querySegmentSpec,
-          filter,
-          filterFields,
           minStageNumber,
           broadcast
       );
@@ -242,8 +217,6 @@ public class DataSourcePlan
               queryContext,
               joinDataSource,
               querySegmentSpec,
-              filter,
-              filterFields,
               minStageNumber,
               broadcast
           );
@@ -369,8 +342,6 @@ public class DataSourcePlan
   private static DataSourcePlan forTable(
       final TableDataSource dataSource,
       final QuerySegmentSpec querySegmentSpec,
-      @Nullable final DimFilter filter,
-      @Nullable final Set<String> filterFields,
       final boolean broadcast
   )
   {
@@ -385,7 +356,7 @@ public class DataSourcePlan
     List<Interval> intervals = querySegmentSpec.getIntervals();
     return new DataSourcePlan(
         (broadcast && dataSource.isGlobal()) ? dataSource : new 
InputNumberDataSource(0),
-        List.of(new TableInputSpec(dataSource.getName(), intervals, segments, 
filter, filterFields)),
+        List.of(new TableInputSpec(dataSource.getName(), intervals, segments)),
         broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
         null
     );
@@ -394,15 +365,13 @@ public class DataSourcePlan
   private static DataSourcePlan forRestricted(
       final RestrictedDataSource dataSource,
       final QuerySegmentSpec querySegmentSpec,
-      @Nullable final DimFilter filter,
-      @Nullable final Set<String> filterFields,
       final boolean broadcast
   )
   {
     DataSource restricted = (broadcast && dataSource.isGlobal())
                             ? dataSource
                             : new RestrictedInputNumberDataSource(0, 
dataSource.getPolicy());
-    return forTable(dataSource.getBase(), querySegmentSpec, filter, 
filterFields, broadcast).withDataSource(restricted);
+    return forTable(dataSource.getBase(), querySegmentSpec, 
broadcast).withDataSource(restricted);
   }
 
   private static DataSourcePlan forExternal(
@@ -490,8 +459,6 @@ public class DataSourcePlan
         queryContext,
         dataSource.getBase(),
         querySegmentSpec,
-        null,
-        null,
         minStageNumber,
         broadcast
     );
@@ -527,8 +494,6 @@ public class DataSourcePlan
         queryContext,
         dataSource.getBase(),
         querySegmentSpec,
-        null,
-        null,
         minStageNumber,
         broadcast
     );
@@ -557,8 +522,6 @@ public class DataSourcePlan
       final QueryContext queryContext,
       final UnionDataSource unionDataSource,
       final QuerySegmentSpec querySegmentSpec,
-      @Nullable DimFilter filter,
-      @Nullable Set<String> filterFields,
       final int minStageNumber,
       final boolean broadcast
   )
@@ -577,8 +540,6 @@ public class DataSourcePlan
           queryContext,
           child,
           querySegmentSpec,
-          filter,
-          filterFields,
           Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()),
           broadcast
       );
@@ -606,8 +567,6 @@ public class DataSourcePlan
       final QueryContext queryContext,
       final JoinDataSource dataSource,
       final QuerySegmentSpec querySegmentSpec,
-      @Nullable final DimFilter filter,
-      @Nullable final Set<String> filterFields,
       final int minStageNumber,
       final boolean broadcast
   )
@@ -620,8 +579,6 @@ public class DataSourcePlan
         queryContext,
         analysis.getBaseDataSource(),
         querySegmentSpec,
-        filter,
-        filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, 
analysis::isBaseColumn),
         Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
         broadcast
     );
@@ -638,8 +595,6 @@ public class DataSourcePlan
           queryContext,
           clause.getDataSource(),
           new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY),
-          null, // Don't push down query filters for right-hand side: needs 
some work to ensure it works properly.
-          null,
           Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
           true // Always broadcast right-hand side of the join.
       );
@@ -805,9 +760,7 @@ public class DataSourcePlan
    * Verify that the provided {@link QuerySegmentSpec} is a {@link 
MultipleIntervalSegmentSpec} with
    * interval {@link Intervals#ETERNITY}. If not, throw an {@link 
UnsupportedOperationException}.
    * <p>
-   * We don't need to support this for anything that is not {@link 
DataSourceAnalysis#isTableBased()}, because
-   * the SQL layer avoids "intervals" in other cases. See
-   * {@link 
org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}.
+   * See {@link 
org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}.
    */
   private static void checkQuerySegmentSpecIsEternity(
       final DataSource dataSource,
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index 94ef7ac4cbf..7a2ec6f239a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -78,8 +78,6 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
         originalQuery.context(),
         originalQuery.getDataSource(),
         originalQuery.getQuerySegmentSpec(),
-        originalQuery.getFilter(),
-        null,
         minStageNumber,
         false
     );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index 6dacc20c6ce..8ace680938c 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -83,8 +83,6 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
         originalQuery.context(),
         originalQuery.getDataSource(),
         originalQuery.getQuerySegmentSpec(),
-        originalQuery.getFilter(),
-        null,
         minStageNumber,
         false
     );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
index 95f3d200b33..f6cc9222df9 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
@@ -88,8 +88,6 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
         originalQuery.context(),
         originalQuery.getDataSource(),
         originalQuery.getQuerySegmentSpec(),
-        originalQuery.getFilter(),
-        null,
         minStageNumber,
         false
     );
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
index 2ed6fefbfe5..b18ee8dde53 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
@@ -45,6 +45,7 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.query.filter.FilterSegmentPruner;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
@@ -235,11 +236,11 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
   {
     // This slicer cannot sliceDynamic.
 
-    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null, null, null);
+    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null);
     Assertions.assertFalse(slicer.canSliceDynamic(inputSpec));
     Assertions.assertThrows(
         UnsupportedOperationException.class,
-        () -> slicer.sliceDynamic(inputSpec, 1, 1, 1)
+        () -> slicer.sliceDynamic(inputSpec, null, 1, 1, 1)
     );
   }
 
@@ -249,8 +250,8 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     // When 1 slice is requested, all segments are assigned to one server, 
even if that server doesn't actually
     // currently serve those segments.
 
-    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null, null, null);
-    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 1);
+    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null);
+    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null, 
1);
     Assertions.assertEquals(
         ImmutableList.of(
             new SegmentsInputSlice(
@@ -305,8 +306,8 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
   {
     // When 2 slices are requested, we assign segments to the servers that 
have those segments.
 
-    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null, null, null);
-    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
+    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null);
+    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null, 
2);
     Assertions.assertEquals(
         ImmutableList.of(
             new SegmentsInputSlice(
@@ -367,8 +368,8 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
   {
     // When 3 slices are requested, only 2 are returned, because we only have 
two workers.
 
-    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null, null, null);
-    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 3);
+    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null);
+    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null, 
3);
     Assertions.assertEquals(
         ImmutableList.of(
             new SegmentsInputSlice(
@@ -428,8 +429,8 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
   @Test
   public void test_sliceStatic_nonexistentTable()
   {
-    final TableInputSpec inputSpec = new 
TableInputSpec(DATASOURCE_NONEXISTENT, null, null, null, null);
-    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 1);
+    final TableInputSpec inputSpec = new 
TableInputSpec(DATASOURCE_NONEXISTENT, null, null);
+    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null, 
1);
     Assertions.assertEquals(
         Collections.emptyList(),
         inputSlices
@@ -444,12 +445,14 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec inputSpec = new TableInputSpec(
         DATASOURCE,
         null,
-        null,
+        null
+    );
+    final FilterSegmentPruner pruner = new FilterSegmentPruner(
         new EqualityFilter(PARTITION_DIM, ColumnType.STRING, "abc", null),
         null
     );
 
-    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
+    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, pruner, 
2);
 
     Assertions.assertEquals(
         ImmutableList.of(
@@ -507,12 +510,10 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec inputSpec = new TableInputSpec(
         DATASOURCE,
         Collections.singletonList(Intervals.of("2000/P1Y")),
-        null,
-        null,
         null
     );
 
-    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
+    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null, 
2);
 
     Assertions.assertEquals(
         ImmutableList.of(
@@ -553,8 +554,8 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
 
     // When 2 slices are requested, we assign segments to the servers that 
have those segments.
 
-    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null, null, null);
-    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
+    final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, 
null);
+    final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null, 
2);
     // Expect segment 2 and then the realtime segments 5 and 6 to be assigned 
round-robin.
     Assertions.assertEquals(
         ImmutableList.of(
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java
index 1b7510fd136..84044c64b5b 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java
@@ -63,7 +63,7 @@ public class InputSpecsTest
   {
     Assert.assertFalse(
         InputSpecs.hasLeafInputs(
-            ImmutableList.of(new TableInputSpec("tbl", null, null, null, 
null)),
+            ImmutableList.of(new TableInputSpec("tbl", null, null)),
             IntSet.of(0)
         )
     );
@@ -75,7 +75,7 @@ public class InputSpecsTest
     Assert.assertTrue(
         InputSpecs.hasLeafInputs(
             ImmutableList.of(
-                new TableInputSpec("tbl", null, null, null, null),
+                new TableInputSpec("tbl", null, null),
                 new StageInputSpec(0)
             ),
             IntSets.emptySet()
@@ -89,7 +89,7 @@ public class InputSpecsTest
     Assert.assertTrue(
         InputSpecs.hasLeafInputs(
             ImmutableList.of(
-                new TableInputSpec("tbl", null, null, null, null),
+                new TableInputSpec("tbl", null, null),
                 new StageInputSpec(0)
             ),
             IntSet.of(1)
@@ -103,7 +103,7 @@ public class InputSpecsTest
     Assert.assertFalse(
         InputSpecs.hasLeafInputs(
             ImmutableList.of(
-                new TableInputSpec("tbl", null, null, null, null),
+                new TableInputSpec("tbl", null, null),
                 new StageInputSpec(0)
             ),
             IntSet.of(0)
@@ -117,8 +117,8 @@ public class InputSpecsTest
     Assert.assertTrue(
         InputSpecs.hasLeafInputs(
             ImmutableList.of(
-                new TableInputSpec("tbl", null, null, null, null),
-                new TableInputSpec("tbl2", null, null, null, null)
+                new TableInputSpec("tbl", null, null),
+                new TableInputSpec("tbl2", null, null)
             ),
             IntSet.of(1)
         )
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
index d8dff6fe082..17692d3fc7c 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
@@ -82,7 +82,7 @@ public class ExternalInputSpecSlicerTest
   {
     Assert.assertEquals(
         ImmutableList.of(unsplittableSlice("foo", "bar", "baz")),
-        slicer.sliceStatic(unsplittableSpec("foo", "bar", "baz"), 2)
+        slicer.sliceStatic(unsplittableSpec("foo", "bar", "baz"), null, 2)
     );
   }
 
@@ -91,7 +91,7 @@ public class ExternalInputSpecSlicerTest
   {
     Assert.assertEquals(
         ImmutableList.of(unsplittableSlice()),
-        slicer.sliceStatic(unsplittableSpec(), 2)
+        slicer.sliceStatic(unsplittableSpec(), null, 2)
     );
   }
 
@@ -103,7 +103,7 @@ public class ExternalInputSpecSlicerTest
             splittableSlice("foo", "baz"),
             splittableSlice("bar")
         ),
-        slicer.sliceStatic(splittableSpec("foo", "bar", "baz"), 2)
+        slicer.sliceStatic(splittableSpec("foo", "bar", "baz"), null, 2)
     );
   }
 
@@ -116,7 +116,7 @@ public class ExternalInputSpecSlicerTest
             splittableSlice("bar"),
             splittableSlice("baz")
         ),
-        slicer.sliceStatic(splittableSpec("foo", "bar", "baz"), 5)
+        slicer.sliceStatic(splittableSpec("foo", "bar", "baz"), null, 5)
     );
   }
 
@@ -125,7 +125,7 @@ public class ExternalInputSpecSlicerTest
   {
     Assert.assertEquals(
         ImmutableList.of(),
-        slicer.sliceStatic(splittableSpec(), 2)
+        slicer.sliceStatic(splittableSpec(), null, 2)
     );
   }
 
@@ -137,7 +137,7 @@ public class ExternalInputSpecSlicerTest
             splittableSlice("foo", "baz"),
             splittableSlice("bar")
         ),
-        slicer.sliceStatic(splittableSpecThatIgnoresSplitHints("foo", "bar", 
"baz"), 2)
+        slicer.sliceStatic(splittableSpecThatIgnoresSplitHints("foo", "bar", 
"baz"), null, 2)
     );
   }
 
@@ -148,7 +148,7 @@ public class ExternalInputSpecSlicerTest
         ImmutableList.of(
             unsplittableSlice("foo", "bar", "baz")
         ),
-        slicer.sliceDynamic(unsplittableSpec("foo", "bar", "baz"), 100, 1, 1)
+        slicer.sliceDynamic(unsplittableSpec("foo", "bar", "baz"), null, 100, 
1, 1)
     );
   }
 
@@ -159,7 +159,7 @@ public class ExternalInputSpecSlicerTest
         ImmutableList.of(
             splittableSlice("foo", "bar", "baz")
         ),
-        slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), 100, 5, 
Long.MAX_VALUE)
+        slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), null, 100, 5, 
Long.MAX_VALUE)
     );
   }
 
@@ -171,7 +171,7 @@ public class ExternalInputSpecSlicerTest
             splittableSlice("foo", "bar"),
             splittableSlice("baz")
         ),
-        slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), 100, 2, 
Long.MAX_VALUE)
+        slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), null, 100, 2, 
Long.MAX_VALUE)
     );
   }
 
@@ -183,7 +183,7 @@ public class ExternalInputSpecSlicerTest
             splittableSlice("foo", "bar"),
             splittableSlice("baz")
         ),
-        slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), 100, 5, 7)
+        slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), null, 100, 5, 
7)
     );
   }
 
@@ -196,7 +196,7 @@ public class ExternalInputSpecSlicerTest
             splittableSlice("bar.gz"),
             splittableSlice("baz.gz")
         ),
-        slicer.sliceDynamic(splittableSpec("foo.gz", "bar.gz", "baz.gz"), 100, 
5, 7)
+        slicer.sliceDynamic(splittableSpec("foo.gz", "bar.gz", "baz.gz"), 
null, 100, 5, 7)
     );
   }
 
@@ -209,7 +209,7 @@ public class ExternalInputSpecSlicerTest
             splittableSlice("bar"),
             splittableSlice("baz")
         ),
-        slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar", 
"baz"), 100, 5, 7)
+        slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar", 
"baz"), null, 100, 5, 7)
     );
   }
 
@@ -221,7 +221,7 @@ public class ExternalInputSpecSlicerTest
             splittableSlice("foo", "baz"),
             splittableSlice("bar")
         ),
-        slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar", 
"baz"), 2, 2, Long.MAX_VALUE)
+        slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar", 
"baz"), null, 2, 2, Long.MAX_VALUE)
     );
   }
 
@@ -232,7 +232,7 @@ public class ExternalInputSpecSlicerTest
         ImmutableList.of(
             splittableSlice("foo", "bar", "baz")
         ),
-        slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar", 
"baz"), 1, 5, Long.MAX_VALUE)
+        slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar", 
"baz"), null, 1, 5, Long.MAX_VALUE)
     );
   }
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StageInputSpecSlicerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StageInputSpecSlicerTest.java
index 024ad956cf2..a3ec7d6e5b2 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StageInputSpecSlicerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StageInputSpecSlicerTest.java
@@ -78,7 +78,7 @@ public class StageInputSpecSlicerTest
                 OutputChannelMode.LOCAL_STORAGE
             )
         ),
-        slicer.sliceStatic(new StageInputSpec(0), 1)
+        slicer.sliceStatic(new StageInputSpec(0), null, 1)
     );
   }
 
@@ -98,7 +98,7 @@ public class StageInputSpecSlicerTest
                 OutputChannelMode.LOCAL_STORAGE
             )
         ),
-        slicer.sliceStatic(new StageInputSpec(0), 2)
+        slicer.sliceStatic(new StageInputSpec(0), null, 2)
     );
   }
 
@@ -118,7 +118,7 @@ public class StageInputSpecSlicerTest
                 OutputChannelMode.LOCAL_STORAGE
             )
         ),
-        slicer.sliceStatic(new StageInputSpec(1), 2)
+        slicer.sliceStatic(new StageInputSpec(1), null, 2)
     );
   }
 
@@ -127,7 +127,7 @@ public class StageInputSpecSlicerTest
   {
     final IllegalStateException e = Assert.assertThrows(
         IllegalStateException.class,
-        () -> slicer.sliceStatic(new StageInputSpec(3), 1)
+        () -> slicer.sliceStatic(new StageInputSpec(3), null, 1)
     );
 
     MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("Stage[3] 
output partitions not available"));
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
index 28be6956a13..55ba95a9d98 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
@@ -30,7 +30,9 @@ import org.apache.druid.msq.exec.SegmentSource;
 import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer;
 import org.apache.druid.msq.input.NilInputSlice;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.query.filter.FilterSegmentPruner;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
@@ -144,16 +146,16 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
   @Test
   public void test_canSliceDynamic()
   {
-    Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, 
null, null, null, null)));
+    Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, 
null, null)));
   }
 
   @Test
   public void test_sliceStatic_noDataSource()
   {
-    final TableInputSpec spec = new TableInputSpec("no such datasource", null, 
null, null, null);
+    final TableInputSpec spec = new TableInputSpec("no such datasource", null, 
null);
     Assert.assertEquals(
         ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
-        slicer.sliceStatic(spec, 2)
+        slicer.sliceStatic(spec, null, 2)
     );
   }
 
@@ -166,8 +168,6 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
             Intervals.of("2000/P1M"),
             Intervals.of("2000-06-01/P1M")
         ),
-        null,
-        null,
         null
     );
 
@@ -204,7 +204,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                 ImmutableList.of()
             )
         ),
-        slicer.sliceStatic(spec, 1)
+        slicer.sliceStatic(spec, null, 1)
     );
   }
 
@@ -214,14 +214,12 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         Collections.singletonList(Intervals.of("2002/P1M")),
-        null,
-        null,
         null
     );
 
     Assert.assertEquals(
         ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
-        slicer.sliceStatic(spec, 2)
+        slicer.sliceStatic(spec, null, 2)
     );
   }
 
@@ -235,9 +233,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
             SEGMENT1.getInterval(),
             SEGMENT1.getVersion(),
             SEGMENT1.getShardSpec().getPartitionNum()
-        )),
-        null,
-        null
+        ))
     );
 
     RichSegmentDescriptor expectedSegment = new RichSegmentDescriptor(
@@ -248,7 +244,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     );
     Assert.assertEquals(
         List.of(new SegmentsInputSlice(DATASOURCE, List.of(expectedSegment), 
List.of())),
-        slicer.sliceStatic(spec, 1));
+        slicer.sliceStatic(spec, null, 1));
   }
 
   @Test
@@ -261,14 +257,12 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
             SEGMENT1.getInterval(),
             SEGMENT1.getVersion(),
             SEGMENT1.getShardSpec().getPartitionNum()
-        )),
-        null,
-        null
+        ))
     );
 
     Assert.assertEquals(
         ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
-        slicer.sliceStatic(spec, 2)
+        slicer.sliceStatic(spec, null, 2)
     );
   }
 
@@ -278,8 +272,10 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         null,
-        null,
-        new SelectorDimFilter("dim", "bar", null),
+        null
+    );
+    final FilterSegmentPruner pruner = new FilterSegmentPruner(
+        new EqualityFilter("dim", ColumnType.STRING, "bar", null),
         null
     );
 
@@ -299,7 +295,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
             ),
             NilInputSlice.INSTANCE
         ),
-        slicer.sliceStatic(spec, 2)
+        slicer.sliceStatic(spec, pruner, 2)
     );
   }
 
@@ -309,8 +305,10 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         null,
-        null,
-        new SelectorDimFilter("dim", "bar", null),
+        null
+    );
+    final FilterSegmentPruner segmentPruner = new FilterSegmentPruner(
+        new EqualityFilter("dim", ColumnType.STRING, "bar", null),
         Collections.emptySet()
     );
 
@@ -335,7 +333,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                 ImmutableList.of()
             )
         ),
-        slicer.sliceStatic(spec, 1)
+        slicer.sliceStatic(spec, segmentPruner, 1)
     );
   }
 
@@ -348,8 +346,10 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
             Intervals.of("2000/P1M"),
             Intervals.of("2000-06-01/P1M")
         ),
-        null,
-        new SelectorDimFilter("dim", "bar", null),
+        null
+    );
+    final FilterSegmentPruner segmentPruner = new FilterSegmentPruner(
+        new EqualityFilter("dim", ColumnType.STRING, "bar", null),
         null
     );
 
@@ -380,14 +380,14 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                 ImmutableList.of()
             )
         ),
-        slicer.sliceStatic(spec, 2)
+        slicer.sliceStatic(spec, segmentPruner, 2)
     );
   }
 
   @Test
   public void test_sliceStatic_oneSlice()
   {
-    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, 
null, null);
+    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
     Assert.assertEquals(
         Collections.singletonList(
             new SegmentsInputSlice(
@@ -409,14 +409,14 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                 ImmutableList.of()
             )
         ),
-        slicer.sliceStatic(spec, 1)
+        slicer.sliceStatic(spec, null, 1)
     );
   }
 
   @Test
   public void test_sliceStatic_needTwoSlices()
   {
-    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, 
null, null);
+    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
     Assert.assertEquals(
         ImmutableList.of(
             new SegmentsInputSlice(
@@ -444,14 +444,14 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                 ImmutableList.of()
             )
         ),
-        slicer.sliceStatic(spec, 2)
+        slicer.sliceStatic(spec, null, 2)
     );
   }
 
   @Test
   public void test_sliceStatic_threeSlices()
   {
-    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, 
null, null);
+    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
     Assert.assertEquals(
         ImmutableList.of(
             new SegmentsInputSlice(
@@ -480,7 +480,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
             ),
             NilInputSlice.INSTANCE
         ),
-        slicer.sliceStatic(spec, 3)
+        slicer.sliceStatic(spec, null, 3)
     );
   }
 
@@ -490,13 +490,12 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2002/P1M")),
-        null,
         null
     );
 
     Assert.assertEquals(
         Collections.emptyList(),
-        slicer.sliceDynamic(spec, 1, 1, 1)
+        slicer.sliceDynamic(spec, null, 1, 1, 1)
     );
   }
 
@@ -506,7 +505,6 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2000/P1M")),
-        null,
         null
     );
 
@@ -531,7 +529,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                 ImmutableList.of()
             )
         ),
-        slicer.sliceDynamic(spec, 1, 1, 1)
+        slicer.sliceDynamic(spec, null, 1, 1, 1)
     );
   }
 
@@ -541,7 +539,6 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2000/P1M")),
-        null,
         null
     );
 
@@ -566,7 +563,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                 ImmutableList.of()
             )
         ),
-        slicer.sliceDynamic(spec, 100, 5, BYTES_PER_SEGMENT * 5)
+        slicer.sliceDynamic(spec, null, 100, 5, BYTES_PER_SEGMENT * 5)
     );
   }
 
@@ -576,7 +573,6 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2000/P1M")),
-        null,
         null
     );
 
@@ -607,7 +603,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                 ImmutableList.of()
             )
         ),
-        slicer.sliceDynamic(spec, 100, 1, BYTES_PER_SEGMENT * 5)
+        slicer.sliceDynamic(spec, null, 100, 1, BYTES_PER_SEGMENT * 5)
     );
   }
 
@@ -617,7 +613,6 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2000/P1M")),
-        null,
         null
     );
 
@@ -648,7 +643,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                 ImmutableList.of()
             )
         ),
-        slicer.sliceDynamic(spec, 100, 5, BYTES_PER_SEGMENT)
+        slicer.sliceDynamic(spec, null, 100, 5, BYTES_PER_SEGMENT)
     );
   }
 }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
index 452003311db..447f4f65479 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
@@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.msq.guice.MSQIndexingModule;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
@@ -35,38 +34,17 @@ import java.util.Collections;
 
 public class TableInputSpecTest extends InitializedNullHandlingTest
 {
-  @Test
-  public void testSerde() throws Exception
-  {
-    final ObjectMapper mapper = TestHelper.makeJsonMapper()
-                                          .registerModules(new 
MSQIndexingModule().getJacksonModules());
 
-    final TableInputSpec spec = new TableInputSpec(
-        "myds",
-        Collections.singletonList(Intervals.of("2000/P1M")),
-        null,
-        new SelectorDimFilter("dim", "val", null),
-        Collections.singleton("dim")
-    );
-
-    Assert.assertEquals(
-        spec,
-        mapper.readValue(mapper.writeValueAsString(spec), InputSpec.class)
-    );
-  }
+  private final ObjectMapper mapper = TestHelper.makeJsonMapper()
+                                        .registerModules(new 
MSQIndexingModule().getJacksonModules());
 
   @Test
-  public void testSerdeEmptyFilterFields() throws Exception
+  public void testSerde() throws Exception
   {
-    final ObjectMapper mapper = TestHelper.makeJsonMapper()
-                                          .registerModules(new 
MSQIndexingModule().getJacksonModules());
-
     final TableInputSpec spec = new TableInputSpec(
         "myds",
         Collections.singletonList(Intervals.of("2000/P1M")),
-        null,
-        new SelectorDimFilter("dim", "val", null),
-        Collections.emptySet()
+        null
     );
 
     Assert.assertEquals(
@@ -75,17 +53,13 @@ public class TableInputSpecTest extends 
InitializedNullHandlingTest
     );
   }
 
+
   @Test
   public void testSerdeEternityInterval() throws Exception
   {
-    final ObjectMapper mapper = TestHelper.makeJsonMapper()
-                                          .registerModules(new 
MSQIndexingModule().getJacksonModules());
-
     final TableInputSpec spec = new TableInputSpec(
         "myds",
         Intervals.ONLY_ETERNITY,
-        null,
-        new SelectorDimFilter("dim", "val", null),
         null
     );
 
@@ -98,15 +72,10 @@ public class TableInputSpecTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeWithSegments() throws Exception
   {
-    final ObjectMapper mapper = TestHelper.makeJsonMapper()
-                                          .registerModules(new 
MSQIndexingModule().getJacksonModules());
-
     final TableInputSpec spec = new TableInputSpec(
         "myds",
         Collections.singletonList(Intervals.of("2000/P1M")),
-        Collections.singletonList(new 
SegmentDescriptor(Intervals.of("2000/P1M"), "version", 0)),
-        new SelectorDimFilter("dim", "val", null),
-        Collections.singleton("dim")
+        Collections.singletonList(new 
SegmentDescriptor(Intervals.of("2000/P1M"), "version", 0))
     );
 
     Assert.assertEquals(
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerTestInputSpecSlicer.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerTestInputSpecSlicer.java
index 4d4cd9d9fc5..3a5fedb1074 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerTestInputSpecSlicer.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerTestInputSpecSlicer.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.kernel.controller;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,7 +37,7 @@ public class ControllerTestInputSpecSlicer implements 
InputSpecSlicer
   }
 
   @Override
-  public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+  public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable 
SegmentPruner segmentPruner, int maxNumSlices)
   {
     final List<InputSlice> slices = new ArrayList<>(maxNumSlices);
     for (int i = 0; i < maxNumSlices; i++) {
@@ -47,6 +49,7 @@ public class ControllerTestInputSpecSlicer implements 
InputSpecSlicer
   @Override
   public List<InputSlice> sliceDynamic(
       InputSpec inputSpec,
+      @Nullable SegmentPruner segmentPruner,
       int maxNumSlices,
       int maxFilesPerSlice,
       long maxBytesPerSlice
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
index 43f3c5c5635..eae23222622 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
@@ -45,10 +45,12 @@ import 
org.apache.druid.msq.input.stage.StripedReadablePartitions;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
 import org.apache.druid.msq.querykit.common.OffsetLimitStageProcessor;
+import org.apache.druid.query.filter.SegmentPruner;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -466,7 +468,7 @@ public class WorkerInputsTest
     );
 
     Mockito.verify(testInputSpecSlicer, 
times(0)).canSliceDynamic(inputSpecToSplit);
-    Mockito.verify(testInputSpecSlicer, times(1)).sliceStatic(any(), anyInt());
+    Mockito.verify(testInputSpecSlicer, times(1)).sliceStatic(any(), any(), 
anyInt());
 
     Assert.assertEquals(
         ImmutableMap.<Integer, List<InputSlice>>builder()
@@ -511,7 +513,7 @@ public class WorkerInputsTest
     );
 
     Mockito.verify(testInputSpecSlicer, 
times(1)).canSliceDynamic(inputSpecToSplit);
-    Mockito.verify(testInputSpecSlicer, times(1)).sliceDynamic(any(), 
anyInt(), anyInt(), anyLong());
+    Mockito.verify(testInputSpecSlicer, times(1)).sliceDynamic(any(), any(), 
anyInt(), anyInt(), anyLong());
 
     Assert.assertEquals(
         ImmutableMap.<Integer, List<InputSlice>>builder()
@@ -657,7 +659,7 @@ public class WorkerInputsTest
     }
 
     @Override
-    public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+    public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable 
SegmentPruner segmentPruner, int maxNumSlices)
     {
       final TestInputSpec testInputSpec = (TestInputSpec) inputSpec;
       final List<List<Long>> assignments =
@@ -672,6 +674,7 @@ public class WorkerInputsTest
     @Override
     public List<InputSlice> sliceDynamic(
         InputSpec inputSpec,
+        @Nullable SegmentPruner segmentPruner,
         int maxNumSlices,
         int maxFilesPerSlice,
         long maxBytesPerSlice
diff --git 
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
 
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
index 814f0809393..cea0ddfa84a 100644
--- 
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
+++ 
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
@@ -23,14 +23,7 @@ order by 1;
     "input" : [ {
       "type" : "table",
       "dataSource" : "wikipedia",
-      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
-      "filter" : {
-        "type" : "inType",
-        "column" : "cityName",
-        "matchValueType" : "STRING",
-        "sortedValues" : [ "Aarhus", "New York" ]
-      },
-      "filterFields" : [ "cityName" ]
+      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
     } ],
     "processor" : {
       "type" : "groupByPreShuffle",
diff --git 
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq
 
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq
index 3dfead41f01..1de39db46fc 100644
--- 
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq
+++ 
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq
@@ -13,14 +13,7 @@ where w1.cityName='New York';
     "input" : [ {
       "type" : "table",
       "dataSource" : "wikipedia",
-      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
-      "filter" : {
-        "type" : "equals",
-        "column" : "cityName",
-        "matchValueType" : "STRING",
-        "matchValue" : "New York"
-      },
-      "filterFields" : [ "cityName" ]
+      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
     } ],
     "processor" : {
       "type" : "scan",
@@ -254,14 +247,7 @@ LogicalJoin:[[broadcast inheritPath:[0, 0]]]
     "input" : [ {
       "type" : "table",
       "dataSource" : "wikipedia",
-      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
-      "filter" : {
-        "type" : "equals",
-        "column" : "cityName",
-        "matchValueType" : "STRING",
-        "matchValue" : "New York"
-      },
-      "filterFields" : [ "cityName" ]
+      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
     } ],
     "processor" : {
       "type" : "scan",
@@ -496,14 +482,7 @@ LogicalJoin:[[sort_merge inheritPath:[0, 0]]]
     "input" : [ {
       "type" : "table",
       "dataSource" : "wikipedia",
-      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
-      "filter" : {
-        "type" : "equals",
-        "column" : "cityName",
-        "matchValueType" : "STRING",
-        "matchValue" : "New York"
-      },
-      "filterFields" : [ "cityName" ]
+      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
     } ],
     "processor" : {
       "type" : "scan",
diff --git 
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqNestedJoinHint.iq
 
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqNestedJoinHint.iq
index 617ae774dc2..b71933ba6dc 100644
--- 
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqNestedJoinHint.iq
+++ 
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqNestedJoinHint.iq
@@ -143,14 +143,7 @@ where w1.cityName='New York';
     "input" : [ {
       "type" : "table",
       "dataSource" : "wikipedia",
-      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
-      "filter" : {
-        "type" : "equals",
-        "column" : "cityName",
-        "matchValueType" : "STRING",
-        "matchValue" : "New York"
-      },
-      "filterFields" : [ "cityName" ]
+      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
     }, {
       "type" : "stage",
       "stage" : 0
diff --git 
a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java 
b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
index 29babc115d0..ad364a754f0 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
@@ -19,21 +19,8 @@
 
 package org.apache.druid.query.filter;
 
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.RangeSet;
-import org.apache.druid.timeline.partition.ShardSpec;
-
-import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 
 /**
  *
@@ -90,86 +77,4 @@ public class DimFilterUtils
     }
     return retVal.array();
   }
-
-  /**
-   * Filter the given iterable of objects by removing any object whose 
ShardSpec, obtained from the converter function,
-   * does not fit in the RangeSet of the dimFilter {@link 
DimFilter#getDimensionRangeSet(String)}. The returned set
-   * contains the filtered objects in the same order as they appear in input.
-   *
-   * DimensionRangedCache stores the RangeSets of different dimensions for the 
dimFilter. It should be re-used
-   * between calls with the same dimFilter to save redundant calls of {@link 
DimFilter#getDimensionRangeSet(String)}
-   * on same dimensions.
-   *
-   * @param dimFilter           The filter to use
-   * @param filterFields        Set of fields to consider for pruning, or null 
to consider all fields
-   * @param input               The iterable of objects to be filtered
-   * @param converter           The function to convert T to ShardSpec that 
can be filtered by
-   * @param dimensionRangeCache The cache of RangeSets of different dimensions 
for the dimFilter
-   * @param <T>                 This can be any type, as long as transform 
function is provided to convert this to ShardSpec
-   *
-   * @return The set of filtered object, in the same order as input
-   */
-  public static <T> Set<T> filterShards(
-      @Nullable final DimFilter dimFilter,
-      @Nullable final Set<String> filterFields,
-      final Iterable<T> input,
-      final Function<T, ShardSpec> converter,
-      final Map<String, Optional<RangeSet<String>>> dimensionRangeCache
-  )
-  {
-    if (dimFilter == null) {
-      // ImmutableSet retains order from "input".
-      return ImmutableSet.copyOf(input);
-    }
-
-    // LinkedHashSet retains order from "input".
-    Set<T> retSet = new LinkedHashSet<>();
-
-    for (T obj : input) {
-      ShardSpec shard = converter.apply(obj);
-      boolean include = true;
-
-      if (shard != null) {
-        Map<String, RangeSet<String>> filterDomain = new HashMap<>();
-        List<String> dimensions = shard.getDomainDimensions();
-        for (String dimension : dimensions) {
-          if (filterFields == null || filterFields.contains(dimension)) {
-            Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
-                .computeIfAbsent(dimension, d -> 
Optional.ofNullable(dimFilter.getDimensionRangeSet(d)));
-
-            if (optFilterRangeSet.isPresent()) {
-              filterDomain.put(dimension, optFilterRangeSet.get());
-            }
-          }
-        }
-        if (!filterDomain.isEmpty() && !shard.possibleInDomain(filterDomain)) {
-          include = false;
-        }
-      }
-
-      if (include) {
-        retSet.add(obj);
-      }
-    }
-    return retSet;
-  }
-
-  /**
-   * Returns a copy of "fields" filtered by the predicate function.
-   */
-  public static Set<String> onlyBaseFields(
-      final Set<String> fields,
-      final Predicate<String> isBaseColumnFn
-  )
-  {
-    final Set<String> retVal = new HashSet<>();
-
-    for (final String field : fields) {
-      if (isBaseColumnFn.apply(field)) {
-        retVal.add(field);
-      }
-    }
-
-    return retVal;
-  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
 
b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
new file mode 100644
index 00000000000..6dce9018b7a
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.google.common.collect.RangeSet;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.ShardSpec;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+  * Uses a {@link DimFilter} to check the {@link 
DimFilter#getDimensionRangeSet(String)} against
+ * {@link ShardSpec#possibleInDomain(Map)} in order to 'prune' a set of 
segments whose rows would never match a filter
+ * and avoid processing those segments in the first place.
+  */
+public class FilterSegmentPruner implements SegmentPruner
+{
+  private final DimFilter filter;
+  private final Set<String> filterFields;
+  private final Map<String, Optional<RangeSet<String>>> rangeCache;
+
+  public FilterSegmentPruner(
+      DimFilter filter,
+      @Nullable Set<String> filterFields
+  )
+  {
+    InvalidInput.conditionalException(filter != null, "filter must not be 
null");
+    this.filter = filter;
+    this.filterFields = filterFields == null ? filter.getRequiredColumns() : 
filterFields;
+    this.rangeCache = new HashMap<>();
+  }
+
+   /**
+    * Filter the given iterable of objects by removing any object whose {@link 
DataSegment}, obtained from the converter
+    * function, does not fit in the RangeSet of the dimFilter {@link 
DimFilter#getDimensionRangeSet(String)}. The
+    * returned set contains the filtered objects in the same order as they 
appear in input.
+    *
+    * {@link #rangeCache} stores the RangeSets of different dimensions for the 
filter, so it can be re-used between
+    * calls to save redundant evaluation of {@link 
DimFilter#getDimensionRangeSet(String)} on the same columns.
+    *
+    * @param input      The iterable of objects to be filtered
+    * @param converter  The function to convert T to {@link DataSegment} that 
can be filtered by
+    * @param <T>        This can be any type, as long as transform function is 
provided to extract a {@link DataSegment}
+    *
+    * @return The set of pruned object, in the same order as input
+    */
+  @Override
+  public <T> Collection<T> prune(Iterable<T> input, Function<T, DataSegment> 
converter)
+  {
+    // LinkedHashSet retains order from "input".
+    final Set<T> retSet = new LinkedHashSet<>();
+
+    for (T obj : input) {
+      final DataSegment segment = converter.apply(obj);
+      if (segment == null) {
+        continue;
+      }
+      final ShardSpec shard = segment.getShardSpec();
+      boolean include = true;
+
+      if (shard != null) {
+        Map<String, RangeSet<String>> filterDomain = new HashMap<>();
+        List<String> dimensions = shard.getDomainDimensions();
+        for (String dimension : dimensions) {
+          if (filterFields == null || filterFields.contains(dimension)) {
+            Optional<RangeSet<String>> optFilterRangeSet =
+                rangeCache.computeIfAbsent(dimension, d -> 
Optional.ofNullable(filter.getDimensionRangeSet(d)));
+
+            optFilterRangeSet.ifPresent(stringRangeSet -> 
filterDomain.put(dimension, stringRangeSet));
+          }
+        }
+        if (!filterDomain.isEmpty() && !shard.possibleInDomain(filterDomain)) {
+          include = false;
+        }
+      }
+
+      if (include) {
+        retSet.add(obj);
+      }
+    }
+    return retSet;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    FilterSegmentPruner that = (FilterSegmentPruner) o;
+    return Objects.equals(filter, that.filter) && Objects.equals(filterFields, 
that.filterFields);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(filter, filterFields);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "FilterSegmentPruner{" +
+           "filter=" + filter +
+           ", filterFields=" + filterFields +
+           ", rangeCache=" + rangeCache +
+           '}';
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java 
b/processing/src/main/java/org/apache/druid/query/filter/SegmentPruner.java
similarity index 51%
copy from 
multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
copy to 
processing/src/main/java/org/apache/druid/query/filter/SegmentPruner.java
index 90a6970485e..4b372271659 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/SegmentPruner.java
@@ -17,24 +17,18 @@
  * under the License.
  */
 
-package org.apache.druid.msq.input;
+package org.apache.druid.query.filter;
 
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.timeline.DataSegment;
 
-/**
- * Slice of an {@link InputSpec} assigned to a particular worker.
- *
- * On the controller, these are produced using {@link InputSpecSlicer}. On 
workers, these are transformed into
- * {@link ReadableSlice} using {@link InputSliceReader}.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-public interface InputSlice
+import java.util.Collection;
+import java.util.function.Function;
+
+public interface SegmentPruner
 {
   /**
-   * Returns the number of files contained within this split. This is the same 
number that would be added to
-   * {@link org.apache.druid.msq.counters.CounterTracker} on full iteration 
through {@link InputSliceReader#attach}.
-   *
-   * May be zero for some kinds of slices, even if they contain data, if the 
input is not file-based.
+   * Filter the given {@link Iterable} of objects containing a {@link 
DataSegment} (obtained from the converter
+   * function), to reduce the overall working set which need to be processed.
    */
-  int fileCount();
+  <T> Collection<T> prune(Iterable<T> input, Function<T, DataSegment> 
converter);
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java 
b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
index b12dfd88e4d..d5baa72dd08 100644
--- 
a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
+++ 
b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
@@ -31,6 +31,8 @@ import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.filter.FilterSegmentPruner;
+import org.apache.druid.query.filter.SegmentPruner;
 import org.apache.druid.query.policy.PolicyEnforcer;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@@ -39,8 +41,11 @@ import org.apache.druid.segment.SegmentMapFunction;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.join.JoinPrefixUtils;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Represents the native engine's execution vertex - the execution unit it may 
execute in one execution cycle.
@@ -82,6 +87,31 @@ import java.util.List;
  */
 public class ExecutionVertex
 {
+  /**
+   * Identifies the vertex for the given query.
+   */
+  public static ExecutionVertex of(Query<?> query)
+  {
+    ExecutionVertexExplorer explorer = new ExecutionVertexExplorer(query);
+    return new ExecutionVertex(explorer);
+  }
+
+  /**
+   * Builds the {@link ExecutionVertex} around a {@link DataSource}.
+   *
+   * Kept for backward compatibility reasons - incorporating
+   * {@link ExecutionVertex} into Filtration will make this obsolete.
+   */
+  public static ExecutionVertex ofDataSource(DataSource dataSource)
+  {
+    ScanQuery query = Druids
+        .newScanQueryBuilder()
+        .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
+        .dataSource(dataSource)
+        .build();
+    return ExecutionVertex.of(query);
+  }
+
   /** The top level query this vertex is describing. */
   protected final Query<?> topQuery;
   /** The base datasource which will be read during the execution.  */
@@ -90,6 +120,7 @@ public class ExecutionVertex
   protected final QuerySegmentSpec querySegmentSpec;
   /** Retained for compatibility with earlier implementation. See {@link 
#isBaseColumn(String)} */
   protected final List<String> joinPrefixes;
+
   /** Retained for compatibility with earlier implementation. */
   protected boolean allRightsAreGlobal;
 
@@ -102,15 +133,6 @@ public class ExecutionVertex
     this.allRightsAreGlobal = explorer.allRightsAreGlobal;
   }
 
-  /**
-   * Identifies the vertex for the given query.
-   */
-  public static ExecutionVertex of(Query<?> query)
-  {
-    ExecutionVertexExplorer explorer = new ExecutionVertexExplorer(query);
-    return new ExecutionVertex(explorer);
-  }
-
   /**
    * The base datasource input of this vertex.
    */
@@ -139,6 +161,141 @@ public class ExecutionVertex
            || (baseDataSource instanceof UnionDataSource && ((UnionDataSource) 
baseDataSource).isTableBased());
   }
 
+  /**
+   * Unwraps the {@link #getBaseDataSource()} if its a {@link TableDataSource}.
+   *
+   * @throws DruidException error of type {@link 
DruidException.Category#DEFENSIVE} if the {@link #getBaseDataSource()}
+   * is not a table. Note that this may not be true even {@link 
#isProcessable()} ()} is true - in cases when the base
+   * datasource is a {@link UnionDataSource} of {@link TableDataSource}.
+   */
+  public final TableDataSource getBaseTableDataSource()
+  {
+    if (baseDataSource instanceof TableDataSource) {
+      return (TableDataSource) baseDataSource;
+    } else {
+      throw DruidException.defensive("Base dataSource[%s] is not a table!", 
baseDataSource);
+    }
+  }
+
+  /**
+   * The applicable {@link QuerySegmentSpec} for this vertex.
+   *
+   * There might be more queries inside a single vertex; so the outer one is 
not
+   * necessary correct.
+   */
+  public QuerySegmentSpec getEffectiveQuerySegmentSpec()
+  {
+    Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec is null!");
+    return querySegmentSpec;
+  }
+
+  /**
+   * Decides if the query can be executed using the cluster walker.
+   */
+  public boolean canRunQueryUsingClusterWalker()
+  {
+    return isProcessable() && isTableBased();
+  }
+
+  /**
+   * Decides if the query can be executed using the local walker.
+   */
+  public boolean canRunQueryUsingLocalWalker()
+  {
+    return isProcessable() && !isTableBased();
+  }
+
+  /**
+   * Decides if the execution time segment mapping function will be expensive.
+   */
+  public boolean isSegmentMapFunctionExpensive()
+  {
+    boolean hasJoin = !joinPrefixes.isEmpty();
+    return hasJoin;
+  }
+
+  @Nullable
+  public SegmentPruner getSegmentPruner()
+  {
+    if (!topQuery.context().isSecondaryPartitionPruningEnabled()) {
+      return null;
+    }
+    if (topQuery.getFilter() == null) {
+      return null;
+    }
+    final Set<String> baseFields = new HashSet<>();
+    for (final String field : topQuery.getFilter().getRequiredColumns()) {
+      if (isBaseColumn(field)) {
+        baseFields.add(field);
+      }
+    }
+
+    return new FilterSegmentPruner(
+        topQuery.getFilter(),
+        baseFields
+    );
+  }
+
+  /**
+   * Answers if the given column is coming from the base datasource or not.
+   *
+   * Retained for backward compatibility for now. The approach taken here 
relies
+   * on join prefixes - which might classify the output of a
+   * {@link VirtualColumn} to be coming from the base datasource. <br/>
+   * An alternate approach would be to analyze these during the segmentmap
+   * function creation.
+   */
+  public boolean isBaseColumn(String columnName)
+  {
+    for (String prefix : joinPrefixes) {
+      if (JoinPrefixUtils.isPrefixedBy(columnName, prefix)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Query buildQueryWithBaseDataSource(DataSource newBaseDataSource)
+  {
+    return new ReplaceBaseDataSource(baseDataSource, 
newBaseDataSource).traverse(topQuery);
+  }
+
+  /**
+   * Assembles the segment mapping function which should be applied to the 
input segments.
+   */
+  public SegmentMapFunction createSegmentMapFunction(PolicyEnforcer 
policyEnforcer)
+  {
+    return 
getTopDataSource().createSegmentMapFunction(topQuery).thenMap(segment -> {
+      segment.validateOrElseThrow(policyEnforcer);
+      return segment;
+    });
+  }
+
+  /**
+   * Returns the first datasource which is not collapsible by the topQuery.
+   */
+  private DataSource getTopDataSource()
+  {
+    Query<?> q = topQuery;
+    while (q.mayCollapseQueryDataSource()) {
+      q = ((QueryDataSource) q.getDataSource()).getQuery();
+    }
+    return q.getDataSource();
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int hashCode()
+  {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Explores the vertex and collects all details.
    */
@@ -233,100 +390,6 @@ public class ExecutionVertex
     }
   }
 
-  /**
-   * Builds the {@link ExecutionVertex} around a {@link DataSource}.
-   *
-   * Kept for backward compatibility reasons - incorporating
-   * {@link ExecutionVertex} into Filtration will make this obsolete.
-   */
-  public static ExecutionVertex ofDataSource(DataSource dataSource)
-  {
-    ScanQuery query = Druids
-        .newScanQueryBuilder()
-        .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
-        .dataSource(dataSource)
-        .build();
-    return ExecutionVertex.of(query);
-  }
-
-  /**
-   * Unwraps the {@link #getBaseDataSource()} if its a {@link TableDataSource}.
-   *
-   * @throws DruidException error of type {@link 
DruidException.Category#DEFENSIVE} if the {@link #getBaseDataSource()}
-   * is not a table. Note that this may not be true even {@link 
#isProcessable()} ()} is true - in cases when the base
-   * datasource is a {@link UnionDataSource} of {@link TableDataSource}.
-   */
-  public final TableDataSource getBaseTableDataSource()
-  {
-    if (baseDataSource instanceof TableDataSource) {
-      return (TableDataSource) baseDataSource;
-    } else {
-      throw DruidException.defensive("Base dataSource[%s] is not a table!", 
baseDataSource);
-    }
-  }
-
-  /**
-   * The applicable {@link QuerySegmentSpec} for this vertex.
-   *
-   * There might be more queries inside a single vertex; so the outer one is 
not
-   * necessary correct.
-   */
-  public QuerySegmentSpec getEffectiveQuerySegmentSpec()
-  {
-    Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec is null!");
-    return querySegmentSpec;
-  }
-
-  /**
-   * Decides if the query can be executed using the cluster walker.
-   */
-  public boolean canRunQueryUsingClusterWalker()
-  {
-    return isProcessable() && isTableBased();
-  }
-
-  /**
-   * Decides if the query can be executed using the local walker.
-   */
-  public boolean canRunQueryUsingLocalWalker()
-  {
-    return isProcessable() && !isTableBased();
-  }
-
-  /**
-   * Decides if the execution time segment mapping function will be expensive.
-   */
-  public boolean isSegmentMapFunctionExpensive()
-  {
-    boolean hasJoin = !joinPrefixes.isEmpty();
-    return hasJoin;
-  }
-
-  /**
-   * Answers if the given column is coming from the base datasource or not.
-   *
-   * Retained for backward compatibility for now. The approach taken here 
relies
-   * on join prefixes - which might classify the output of a
-   * {@link VirtualColumn} to be coming from the base datasource. <br/>
-   * An alternate approach would be to analyze these during the segmentmap
-   * function creation.
-   */
-  public boolean isBaseColumn(String columnName)
-  {
-    for (String prefix : joinPrefixes) {
-      if (JoinPrefixUtils.isPrefixedBy(columnName, prefix)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @SuppressWarnings("rawtypes")
-  public Query buildQueryWithBaseDataSource(DataSource newBaseDataSource)
-  {
-    return new ReplaceBaseDataSource(baseDataSource, 
newBaseDataSource).traverse(topQuery);
-  }
-
   /**
    * Replaces the base datasource of the given query.
    */
@@ -368,39 +431,4 @@ public class ExecutionVertex
       return query;
     }
   }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int hashCode()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Assembles the segment mapping function which should be applied to the 
input segments.
-   */
-  public SegmentMapFunction createSegmentMapFunction(PolicyEnforcer 
policyEnforcer)
-  {
-    return 
getTopDataSource().createSegmentMapFunction(topQuery).thenMap(segment -> {
-      segment.validateOrElseThrow(policyEnforcer);
-      return segment;
-    });
-  }
-
-  /**
-   * Returns the first datasource which is not collapsible by the topQuery.
-   */
-  private DataSource getTopDataSource()
-  {
-    Query<?> q = topQuery;
-    while (q.mayCollapseQueryDataSource()) {
-      q = ((QueryDataSource) q.getDataSource()).getQuery();
-    }
-    return q.getDataSource();
-  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
 
b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
deleted file mode 100644
index 179abe02121..00000000000
--- 
a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.filter;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableRangeSet;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Range;
-import com.google.common.collect.RangeSet;
-import org.apache.druid.timeline.partition.ShardSpec;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-public class DimFilterUtilsTest
-{
-  private static final Function<ShardSpec, ShardSpec> CONVERTER = new 
Function<>()
-  {
-    @Nullable
-    @Override
-    public ShardSpec apply(@Nullable ShardSpec input)
-    {
-      return input;
-    }
-  };
-
-  @Test
-  public void testFilterShards()
-  {
-    DimFilter filter1 = EasyMock.createMock(DimFilter.class);
-    EasyMock.expect(filter1.getDimensionRangeSet("dim1"))
-            .andReturn(rangeSet(ImmutableList.of(Range.lessThan("abc"))))
-            .anyTimes();
-    EasyMock.expect(filter1.getDimensionRangeSet("dim2"))
-            .andReturn(null)
-            .anyTimes();
-
-    ShardSpec shard1 = shardSpec("dim1", true);
-    ShardSpec shard2 = shardSpec("dim1", false);
-    ShardSpec shard3 = shardSpec("dim1", false);
-    ShardSpec shard4 = shardSpec("dim2", false);
-    ShardSpec shard5 = shardSpec("dim2", false);
-    ShardSpec shard6 = shardSpec("dim2", false);
-    ShardSpec shard7 = shardSpec("dim2", false);
-
-    List<ShardSpec> shards = ImmutableList.of(shard1, shard2, shard3, shard4, 
shard5, shard6, shard7);
-    EasyMock.replay(filter1, shard1, shard2, shard3, shard4, shard5, shard6, 
shard7);
-
-    Set<ShardSpec> expected1 = ImmutableSet.of(shard1, shard4, shard5, shard6, 
shard7);
-    assertFilterResult(filter1, null, shards, expected1);
-    assertFilterResult(filter1, Collections.singleton("dim1"), shards, 
expected1);
-    assertFilterResult(filter1, Collections.singleton("dim2"), shards, 
ImmutableSet.copyOf(shards));
-    assertFilterResult(filter1, Collections.emptySet(), shards, 
ImmutableSet.copyOf(shards));
-  }
-
-  private void assertFilterResult(
-      DimFilter filter,
-      Set<String> filterFields,
-      Iterable<ShardSpec> input,
-      Set<ShardSpec> expected
-  )
-  {
-    Set<ShardSpec> result = new HashSet<>();
-    Map<String, Optional<RangeSet<String>>> dimensionRangeMap = new 
HashMap<>();
-    for (ShardSpec shard : input) {
-      result.addAll(
-          DimFilterUtils.filterShards(
-              filter,
-              filterFields,
-              ImmutableList.of(shard),
-              CONVERTER,
-              dimensionRangeMap
-          )
-      );
-    }
-    Assert.assertEquals(expected, result);
-  }
-
-  private static RangeSet<String> rangeSet(List<Range<String>> ranges)
-  {
-    ImmutableRangeSet.Builder<String> builder = ImmutableRangeSet.builder();
-    for (Range<String> range : ranges) {
-      builder.add(range);
-    }
-    return builder.build();
-  }
-
-  private static ShardSpec shardSpec(String dimension, boolean contained)
-  {
-    ShardSpec shard = EasyMock.createMock(ShardSpec.class);
-    EasyMock.expect(shard.getDomainDimensions())
-            .andReturn(ImmutableList.of(dimension))
-            .anyTimes();
-    EasyMock.expect(shard.possibleInDomain(EasyMock.anyObject()))
-            .andReturn(contained)
-            .anyTimes();
-    return shard;
-  }
-}
diff --git 
a/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
 
b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
new file mode 100644
index 00000000000..131337ad468
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+class FilterSegmentPrunerTest
+{
+  @Test
+  void testNullFilter()
+  {
+    Throwable t = Assertions.assertThrows(
+        DruidException.class,
+        () -> new FilterSegmentPruner(null, null)
+    );
+    Assertions.assertEquals("filter must not be null", t.getMessage());
+  }
+
+  @Test
+  void testPrune()
+  {
+    DimFilter range_a = new RangeFilter("dim1", ColumnType.STRING, null, 
"aaa", null, null, null);
+    DimFilter expression_b = new ExpressionDimFilter("dim2 == 'c'", null, 
TestExprMacroTable.INSTANCE);
+
+    String interval1 = "2026-02-18T00:00:00Z/2026-02-19T00:00:00Z";
+    String interval2 = "2026-02-19T00:00:00Z/2026-02-20T00:00:00Z";
+
+    DataSegment seg1 = makeDataSegment(interval1, makeRange("dim1", 0, null, 
"abc"));
+    DataSegment seg2 = makeDataSegment(interval1, makeRange("dim1", 1, "abc", 
"lmn"));
+    DataSegment seg3 = makeDataSegment(interval1, makeRange("dim1", 2, "lmn", 
null));
+    DataSegment seg4 = makeDataSegment(interval2, makeRange("dim2", 0, null, 
"b"));
+    DataSegment seg5 = makeDataSegment(interval2, makeRange("dim2", 1, "b", 
"j"));
+    DataSegment seg6 = makeDataSegment(interval2, makeRange("dim2", 2, "j", 
"s"));
+    DataSegment seg7 = makeDataSegment(interval2, makeRange("dim2", 3, "s", 
"t"));
+
+    List<DataSegment> segs = List.of(seg1, seg2, seg3, seg4, seg5, seg6, seg7);
+
+    FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null);
+    FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a, 
Collections.emptySet());
+    FilterSegmentPruner prunerExpression = new 
FilterSegmentPruner(expression_b, null);
+
+    Assertions.assertEquals(Set.of(seg1, seg4, seg5, seg6, seg7), 
prunerRange.prune(segs, Function.identity()));
+    Assertions.assertEquals(Set.copyOf(segs), prunerExpression.prune(segs, 
Function.identity()));
+    Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, 
Function.identity()));
+  }
+
+  @Test
+  void testEqualsAndHashcode()
+  {
+    
EqualsVerifier.forClass(FilterSegmentPruner.class).usingGetClass().withIgnoredFields("rangeCache").verify();
+  }
+
+  private ShardSpec makeRange(
+      String column,
+      int partitionNumber,
+      @Nullable String start,
+      @Nullable String end
+  )
+  {
+    return makeRange(
+        List.of(column),
+        partitionNumber,
+        start == null ? null : StringTuple.create(start),
+        end == null ? null : StringTuple.create(end)
+    );
+  }
+
+  private ShardSpec makeRange(
+      List<String> columns,
+      int partitionNumber,
+      @Nullable StringTuple start,
+      @Nullable StringTuple end
+  )
+  {
+    return new DimensionRangeShardSpec(
+        columns,
+        start,
+        end,
+        partitionNumber,
+        0
+    );
+  }
+
+  private DataSegment makeDataSegment(String intervalString, ShardSpec 
shardSpec)
+  {
+    Interval interval = Intervals.of(intervalString);
+    return DataSegment.builder(SegmentId.of("prune-test", interval, "0", 
shardSpec))
+                      .shardSpec(shardSpec)
+                      .build();
+  }
+}
diff --git 
a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/qaWin/sql_explain.msq.iq
 
b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/qaWin/sql_explain.msq.iq
index 2f1a50c377b..9891f57e668 100644
--- 
a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/qaWin/sql_explain.msq.iq
+++ 
b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/qaWin/sql_explain.msq.iq
@@ -66,14 +66,7 @@ WHERE client_ip IN ('107.13.54.103',
     "input" : [ {
       "type" : "table",
       "dataSource" : "test_win",
-      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
-      "filter" : {
-        "type" : "inType",
-        "column" : "client_ip",
-        "matchValueType" : "STRING",
-        "sortedValues" : [ "107.13.54.103", "99.9.55.22" ]
-      },
-      "filterFields" : [ "client_ip" ]
+      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
     } ],
     "processor" : {
       "type" : "groupByPreShuffle",
@@ -726,14 +719,7 @@ WHERE client_ip IN ('107.13.54.103',
     "input" : [ {
       "type" : "table",
       "dataSource" : "test_win",
-      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
-      "filter" : {
-        "type" : "inType",
-        "column" : "client_ip",
-        "matchValueType" : "STRING",
-        "sortedValues" : [ "107.13.54.103", "99.9.55.22" ]
-      },
-      "filterFields" : [ "client_ip" ]
+      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
     } ],
     "processor" : {
       "type" : "groupByPreShuffle",
@@ -1380,14 +1366,7 @@ GROUP BY server_ip,
     "input" : [ {
       "type" : "table",
       "dataSource" : "test_win",
-      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
-      "filter" : {
-        "type" : "inType",
-        "column" : "client_ip",
-        "matchValueType" : "STRING",
-        "sortedValues" : [ "107.13.54.103", "99.9.55.22" ]
-      },
-      "filterFields" : [ "client_ip" ]
+      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
     } ],
     "processor" : {
       "type" : "groupByPreShuffle",
@@ -2017,14 +1996,7 @@ GROUP BY server_ip,
     "input" : [ {
       "type" : "table",
       "dataSource" : "test_win",
-      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
-      "filter" : {
-        "type" : "inType",
-        "column" : "client_ip",
-        "matchValueType" : "STRING",
-        "sortedValues" : [ "107.13.54.103", "99.9.55.22" ]
-      },
-      "filterFields" : [ "client_ip" ]
+      "intervals" : [ 
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
     } ],
     "processor" : {
       "type" : "groupByPreShuffle",
diff --git 
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java 
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 33237f372a2..eb0a5a83997 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
-import com.google.common.collect.RangeSet;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
@@ -72,7 +71,7 @@ import org.apache.druid.query.Result;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.aggregation.MetricManipulatorFns;
 import org.apache.druid.query.context.ResponseContext;
-import org.apache.druid.query.filter.DimFilterUtils;
+import org.apache.druid.query.filter.SegmentPruner;
 import org.apache.druid.query.planning.ExecutionVertex;
 import org.apache.druid.server.QueryResource;
 import org.apache.druid.server.QueryScheduler;
@@ -91,6 +90,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -442,32 +442,16 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
       );
 
       final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
-      final Map<String, Optional<RangeSet<String>>> dimensionRangeCache;
-      final Set<String> filterFieldsForPruning;
-
-      final boolean trySecondaryPartititionPruning =
-          query.getFilter() != null && 
query.context().isSecondaryPartitionPruningEnabled();
-
-      if (trySecondaryPartititionPruning) {
-        dimensionRangeCache = new HashMap<>();
-        filterFieldsForPruning =
-            
DimFilterUtils.onlyBaseFields(query.getFilter().getRequiredColumns(), 
ev::isBaseColumn);
-      } else {
-        dimensionRangeCache = null;
-        filterFieldsForPruning = null;
-      }
+      final SegmentPruner segmentPruner = ev.getSegmentPruner();
 
       boolean isRealtimeSegmentOnly = query.context().isRealtimeSegmentsOnly();
       // Filter unneeded chunks based on partition dimension
       for (TimelineObjectHolder<String, ServerSelector> holder : 
serversLookup) {
-        final Set<PartitionChunk<ServerSelector>> filteredChunks;
-        if (trySecondaryPartititionPruning) {
-          filteredChunks = DimFilterUtils.filterShards(
-              query.getFilter(),
-              filterFieldsForPruning,
+        final Collection<PartitionChunk<ServerSelector>> filteredChunks;
+        if (segmentPruner != null) {
+          filteredChunks = segmentPruner.prune(
               holder.getObject(),
-              partitionChunk -> 
partitionChunk.getObject().getSegment().getShardSpec(),
-              dimensionRangeCache
+              partitionChunk -> partitionChunk.getObject().getSegment()
           );
         } else {
           filteredChunks = Sets.newLinkedHashSet(holder.getObject());
diff --git a/web-console/src/druid-models/stages/stages.mock.ts 
b/web-console/src/druid-models/stages/stages.mock.ts
index 8b056c1f3ae..c10c5794571 100644
--- a/web-console/src/druid-models/stages/stages.mock.ts
+++ b/web-console/src/druid-models/stages/stages.mock.ts
@@ -215,13 +215,6 @@ export const STAGES = new Stages(
             type: 'table',
             dataSource: 'kttm_simple',
             intervals: 
['-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z'],
-            filter: {
-              type: 'equals',
-              column: 'os',
-              matchValueType: 'STRING',
-              matchValue: 'iOS',
-            },
-            filterFields: ['os'],
           },
           {
             type: 'stage',
diff --git a/web-console/src/druid-models/stages/stages.ts 
b/web-console/src/druid-models/stages/stages.ts
index cbb67a26b5b..6831891a8b0 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -71,8 +71,6 @@ export type StageInput =
       type: 'table';
       dataSource: string;
       intervals: string[];
-      filter?: any;
-      filterFields?: string[];
     }
   | {
       type: 'external';
diff --git 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
index 26a1f005ff6..ff43e3f8f99 100644
--- 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
+++ 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
@@ -19,7 +19,6 @@
 import { Button, Icon, Intent } from '@blueprintjs/core';
 import { IconNames } from '@blueprintjs/icons';
 import classNames from 'classnames';
-import * as JSONBig from 'json-bigint-native';
 import React from 'react';
 import type { Column } from 'react-table';
 import ReactTable from 'react-table';
@@ -78,7 +77,6 @@ function summarizeTableInput(tableStageInput: StageInput): 
string {
   return assemble(
     `Datasource: ${tableStageInput.dataSource}`,
     `Interval: ${tableStageInput.intervals.join('; ')}`,
-    tableStageInput.filter && `Filter: 
${JSONBig.stringify(tableStageInput.filter)}`,
   ).join('\n');
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to