This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 98f1eb8ede5 Use filters for pruning properly for hash-joins. (#15299)
98f1eb8ede5 is described below

commit 98f1eb8ede58400416049db925faee287e3fcc74
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Nov 3 07:29:16 2023 -0700

    Use filters for pruning properly for hash-joins. (#15299)
    
    * Use filters for pruning properly for hash-joins.
    
    Native used them too aggressively: it might use filters for the RHS
    to prune the LHS. MSQ used them not at all. Now, both use them properly,
    pruning based on base (LHS) columns only.
    
    * Fix tests.
    
    * Fix style.
    
    * Clear filterFields too.
    
    * Update.
---
 .../druid/msq/input/table/TableInputSpec.java      | 42 ++++++++----
 .../msq/input/table/TableInputSpecSlicer.java      |  5 +-
 .../apache/druid/msq/querykit/DataSourcePlan.java  | 75 ++++++++++++++++++++--
 .../msq/querykit/groupby/GroupByQueryKit.java      |  1 +
 .../druid/msq/querykit/scan/ScanQueryKit.java      |  1 +
 .../msq/input/table/TableInputSpecSlicerTest.java  | 59 +++++++++++++++--
 .../druid/msq/input/table/TableInputSpecTest.java  | 25 +++++++-
 .../apache/druid/query/filter/DimFilterUtils.java  | 59 +++++++++--------
 .../druid/query/planning/DataSourceAnalysis.java   | 19 ++++++
 .../druid/query/filter/DimFilterUtilsTest.java     | 28 ++++++--
 .../query/planning/DataSourceAnalysisTest.java     | 33 ++++++++++
 .../druid/client/CachingClusteredClient.java       | 21 +++++-
 12 files changed, 307 insertions(+), 61 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
index b3337f4307c..2691d827f89 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
@@ -32,6 +32,7 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * Input spec representing a Druid table.
@@ -45,28 +46,35 @@ public class TableInputSpec implements InputSpec
   @Nullable
   private final DimFilter filter;
 
+  @Nullable
+  private final Set<String> filterFields;
+
   /**
    * Create a table input spec.
    *
-   * @param dataSource datasource to read
-   * @param intervals  intervals to filter, or null if no time filtering is 
desired. Interval filtering is strict,
-   *                   meaning that when this spec is sliced and read, the 
returned {@link SegmentWithDescriptor}
-   *                   from {@link ReadableInput#getSegment()} are clipped to 
these intervals.
-   * @param filter     other filters to use for pruning, or null if no pruning 
is desired. Pruning filters are
-   *                   *not strict*, which means that processors must re-apply 
them when processing the returned
-   *                   {@link SegmentWithDescriptor} from {@link 
ReadableInput#getSegment()}. This matches how
-   *                   Broker-based pruning works for native queries.
+   * @param dataSource   datasource to read
+   * @param intervals    intervals to filter, or null if no time filtering is 
desired. Interval filtering is strict,
+   *                     meaning that when this spec is sliced and read, the 
returned {@link SegmentWithDescriptor}
+   *                     from {@link ReadableInput#getSegment()} are clipped 
to these intervals.
+   * @param filter       other filters to use for pruning, or null if no 
pruning is desired. Pruning filters are
+   *                     *not strict*, which means that processors must 
re-apply them when processing the returned
+   *                     {@link SegmentWithDescriptor} from {@link 
ReadableInput#getSegment()}. This matches how
+   *                     Broker-based pruning works for native queries.
+   * @param filterFields list of fields from {@link 
DimFilter#getRequiredColumns()} to consider for pruning. If null,
+   *                     all fields are considered for pruning.
    */
   @JsonCreator
   public TableInputSpec(
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("intervals") @Nullable List<Interval> intervals,
-      @JsonProperty("filter") @Nullable DimFilter filter
+      @JsonProperty("filter") @Nullable DimFilter filter,
+      @JsonProperty("filterFields") @Nullable Set<String> filterFields
   )
   {
     this.dataSource = dataSource;
     this.intervals = intervals == null ? Intervals.ONLY_ETERNITY : intervals;
     this.filter = filter;
+    this.filterFields = filterFields;
   }
 
   @JsonProperty
@@ -97,6 +105,14 @@ public class TableInputSpec implements InputSpec
     return filter;
   }
 
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
+  public Set<String> getFilterFields()
+  {
+    return filterFields;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -109,13 +125,14 @@ public class TableInputSpec implements InputSpec
     TableInputSpec that = (TableInputSpec) o;
     return Objects.equals(dataSource, that.dataSource)
            && Objects.equals(intervals, that.intervals)
-           && Objects.equals(filter, that.filter);
+           && Objects.equals(filter, that.filter)
+           && Objects.equals(filterFields, that.filterFields);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(dataSource, intervals, filter);
+    return Objects.hash(dataSource, intervals, filter, filterFields);
   }
 
   @Override
@@ -124,7 +141,8 @@ public class TableInputSpec implements InputSpec
     return "TableInputSpec{" +
            "dataSource='" + dataSource + '\'' +
            ", intervals=" + intervals +
-           ", filter=" + filter +
+           (filter == null ? "" : ", filter=" + filter) +
+           (filterFields == null ? "" : ", filterFields=" + filterFields) +
            '}';
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
index 91f2e681e1e..1cd82f726ed 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
@@ -33,6 +33,7 @@ import org.joda.time.Interval;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -115,8 +116,10 @@ public class TableInputSpecSlicer implements 
InputSpecSlicer
 
       return DimFilterUtils.filterShards(
           tableInputSpec.getFilter(),
+          tableInputSpec.getFilterFields(),
           () -> dataSegmentIterator,
-          segment -> segment.getSegment().getShardSpec()
+          segment -> segment.getSegment().getShardSpec(),
+          new HashMap<>()
       );
     }
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index 16eaef63c49..566b084ad36 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -54,6 +54,7 @@ import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.UnionDataSource;
 import org.apache.druid.query.UnnestDataSource;
 import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.DimFilterUtils;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.planning.PreJoinableClause;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@@ -74,8 +75,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+/**
+ * Plan for getting data from a {@link DataSource}. Used by {@link QueryKit} 
implementations.
+ */
 public class DataSourcePlan
 {
   /**
@@ -116,6 +121,22 @@ public class DataSourcePlan
     }
   }
 
+  /**
+   * Build a plan.
+   *
+   * @param queryKit         query kit reference for recursive planning
+   * @param queryId          query ID
+   * @param queryContext     query context
+   * @param dataSource       datasource to plan
+   * @param querySegmentSpec intervals for mandatory pruning. Must be {@link 
MultipleIntervalSegmentSpec}. The returned
+   *                         plan is guaranteed to be filtered to this 
interval.
+   * @param filter           filter for best-effort pruning. The returned plan 
may or may not be filtered to this
+   *                         filter. Query processing must still apply the 
filter to generated correct results.
+   * @param filterFields     which fields from the filter to consider for 
pruning, or null to consider all fields.
+   * @param maxWorkerCount   maximum number of workers for subqueries
+   * @param minStageNumber   starting stage number for subqueries
+   * @param broadcast        whether the plan should broadcast data for this 
datasource
+   */
   @SuppressWarnings("rawtypes")
   public static DataSourcePlan forDataSource(
       final QueryKit queryKit,
@@ -124,13 +145,32 @@ public class DataSourcePlan
       final DataSource dataSource,
       final QuerySegmentSpec querySegmentSpec,
       @Nullable DimFilter filter,
+      @Nullable Set<String> filterFields,
       final int maxWorkerCount,
       final int minStageNumber,
       final boolean broadcast
   )
   {
+    if (!queryContext.isSecondaryPartitionPruningEnabled()) {
+      // Clear filter, we don't want to prune today.
+      filter = null;
+      filterFields = null;
+    }
+
+    if (filter != null && filterFields == null) {
+      // Ensure filterFields is nonnull if filter is nonnull. Helps for other 
forXYZ methods, so they don't need to
+      // deal with the case where filter is nonnull but filterFields is null.
+      filterFields = filter.getRequiredColumns();
+    }
+
     if (dataSource instanceof TableDataSource) {
-      return forTable((TableDataSource) dataSource, 
querySegmentSpecIntervals(querySegmentSpec), filter, broadcast);
+      return forTable(
+          (TableDataSource) dataSource,
+          querySegmentSpecIntervals(querySegmentSpec),
+          filter,
+          filterFields,
+          broadcast
+      );
     } else if (dataSource instanceof ExternalDataSource) {
       checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
       return forExternal((ExternalDataSource) dataSource, broadcast);
@@ -179,6 +219,7 @@ public class DataSourcePlan
           (UnionDataSource) dataSource,
           querySegmentSpec,
           filter,
+          filterFields,
           maxWorkerCount,
           minStageNumber,
           broadcast
@@ -198,6 +239,8 @@ public class DataSourcePlan
               queryContext,
               (JoinDataSource) dataSource,
               querySegmentSpec,
+              filter,
+              filterFields,
               maxWorkerCount,
               minStageNumber,
               broadcast
@@ -222,21 +265,36 @@ public class DataSourcePlan
     }
   }
 
+  /**
+   * Possibly remapped datasource that should be used when processing. Will be 
either the original datasource, or the
+   * original datasource with itself or some children replaced by {@link 
InputNumberDataSource}. Any added
+   * {@link InputNumberDataSource} refer to {@link StageInputSpec} in {@link 
#getInputSpecs()}.
+   */
   public DataSource getNewDataSource()
   {
     return newDataSource;
   }
 
+  /**
+   * Input specs that should be used when processing.
+   */
   public List<InputSpec> getInputSpecs()
   {
     return inputSpecs;
   }
 
+  /**
+   * Which input specs from {@link #getInputSpecs()} are broadcast.
+   */
   public IntSet getBroadcastInputs()
   {
     return broadcastInputs;
   }
 
+  /**
+   * Returns a {@link QueryDefinitionBuilder} that includes any {@link 
StageInputSpec} from {@link #getInputSpecs()}.
+   * Absent if this plan does not involve reading from prior stages.
+   */
   public Optional<QueryDefinitionBuilder> getSubQueryDefBuilder()
   {
     return Optional.ofNullable(subQueryDefBuilder);
@@ -302,12 +360,13 @@ public class DataSourcePlan
       final TableDataSource dataSource,
       final List<Interval> intervals,
       @Nullable final DimFilter filter,
+      @Nullable final Set<String> filterFields,
       final boolean broadcast
   )
   {
     return new DataSourcePlan(
         (broadcast && dataSource.isGlobal()) ? dataSource : new 
InputNumberDataSource(0),
-        Collections.singletonList(new TableInputSpec(dataSource.getName(), 
intervals, filter)),
+        Collections.singletonList(new TableInputSpec(dataSource.getName(), 
intervals, filter, filterFields)),
         broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
         null
     );
@@ -407,6 +466,7 @@ public class DataSourcePlan
         dataSource.getBase(),
         querySegmentSpec,
         null,
+        null,
         maxWorkerCount,
         minStageNumber,
         broadcast
@@ -447,6 +507,7 @@ public class DataSourcePlan
         dataSource.getBase(),
         querySegmentSpec,
         null,
+        null,
         maxWorkerCount,
         minStageNumber,
         broadcast
@@ -478,6 +539,7 @@ public class DataSourcePlan
       final UnionDataSource unionDataSource,
       final QuerySegmentSpec querySegmentSpec,
       @Nullable DimFilter filter,
+      @Nullable Set<String> filterFields,
       final int maxWorkerCount,
       final int minStageNumber,
       final boolean broadcast
@@ -499,6 +561,7 @@ public class DataSourcePlan
           child,
           querySegmentSpec,
           filter,
+          filterFields,
           maxWorkerCount,
           Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()),
           broadcast
@@ -528,6 +591,8 @@ public class DataSourcePlan
       final QueryContext queryContext,
       final JoinDataSource dataSource,
       final QuerySegmentSpec querySegmentSpec,
+      @Nullable final DimFilter filter,
+      @Nullable final Set<String> filterFields,
       final int maxWorkerCount,
       final int minStageNumber,
       final boolean broadcast
@@ -542,7 +607,8 @@ public class DataSourcePlan
         queryContext,
         analysis.getBaseDataSource(),
         querySegmentSpec,
-        null, // Don't push query filters down through a join: this needs some 
work to ensure pruning works properly.
+        filter,
+        filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, 
analysis),
         maxWorkerCount,
         Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
         broadcast
@@ -561,7 +627,8 @@ public class DataSourcePlan
           queryContext,
           clause.getDataSource(),
           new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY),
-          null, // Don't push query filters down through a join: this needs 
some work to ensure pruning works properly.
+          null, // Don't push down query filters for right-hand side: needs 
some work to ensure it works properly.
+          null,
           maxWorkerCount,
           Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
           true // Always broadcast right-hand side of the join.
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index b12048a315f..469a8a8aa46 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -84,6 +84,7 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
         originalQuery.getDataSource(),
         originalQuery.getQuerySegmentSpec(),
         originalQuery.getFilter(),
+        null,
         maxWorkerCount,
         minStageNumber,
         false
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
index 681d0ae9c00..986554f86c8 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
@@ -95,6 +95,7 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
         originalQuery.getDataSource(),
         originalQuery.getQuerySegmentSpec(),
         originalQuery.getFilter(),
+        null,
         maxWorkerCount,
         minStageNumber,
         false
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java
index fd5db7e75f6..67d5040d447 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java
@@ -112,13 +112,13 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
   @Test
   public void test_canSliceDynamic()
   {
-    Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, 
null, null)));
+    Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, 
null, null, null)));
   }
 
   @Test
   public void test_sliceStatic_noDataSource()
   {
-    final TableInputSpec spec = new TableInputSpec("no such datasource", null, 
null);
+    final TableInputSpec spec = new TableInputSpec("no such datasource", null, 
null, null);
     Assert.assertEquals(
         ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
         slicer.sliceStatic(spec, 2)
@@ -134,6 +134,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
             Intervals.of("2000/P1M"),
             Intervals.of("2000-06-01/P1M")
         ),
+        null,
         null
     );
 
@@ -183,6 +184,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         Collections.singletonList(Intervals.of("2002/P1M")),
+        null,
         null
     );
 
@@ -198,7 +200,8 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         null,
-        new SelectorDimFilter("dim", "bar", null)
+        new SelectorDimFilter("dim", "bar", null),
+        null
     );
 
     Assert.assertEquals(
@@ -221,6 +224,42 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     );
   }
 
+  @Test
+  public void test_sliceStatic_dimFilterNotUsed()
+  {
+    final TableInputSpec spec = new TableInputSpec(
+        DATASOURCE,
+        null,
+        new SelectorDimFilter("dim", "bar", null),
+        Collections.emptySet()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new SegmentsInputSlice(
+                DATASOURCE,
+                ImmutableList.of(
+                    new RichSegmentDescriptor(
+                        SEGMENT1.getInterval(),
+                        SEGMENT1.getInterval(),
+                        SEGMENT1.getVersion(),
+                        SEGMENT1.getShardSpec().getPartitionNum(),
+                        null
+                    ),
+                    new RichSegmentDescriptor(
+                        SEGMENT2.getInterval(),
+                        SEGMENT2.getInterval(),
+                        SEGMENT2.getVersion(),
+                        SEGMENT2.getShardSpec().getPartitionNum(),
+                        null
+                    )
+                )
+            )
+        ),
+        slicer.sliceStatic(spec, 1)
+    );
+  }
+
   @Test
   public void test_sliceStatic_intervalAndDimFilter()
   {
@@ -230,7 +269,8 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
             Intervals.of("2000/P1M"),
             Intervals.of("2000-06-01/P1M")
         ),
-        new SelectorDimFilter("dim", "bar", null)
+        new SelectorDimFilter("dim", "bar", null),
+        null
     );
 
     Assert.assertEquals(
@@ -267,7 +307,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
   @Test
   public void test_sliceStatic_oneSlice()
   {
-    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
+    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, 
null);
     Assert.assertEquals(
         Collections.singletonList(
             new SegmentsInputSlice(
@@ -297,7 +337,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
   @Test
   public void test_sliceStatic_needTwoSlices()
   {
-    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
+    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, 
null);
     Assert.assertEquals(
         ImmutableList.of(
             new SegmentsInputSlice(
@@ -332,7 +372,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
   @Test
   public void test_sliceStatic_threeSlices()
   {
-    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
+    final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, 
null);
     Assert.assertEquals(
         ImmutableList.of(
             new SegmentsInputSlice(
@@ -371,6 +411,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2002/P1M")),
+        null,
         null
     );
 
@@ -386,6 +427,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2000/P1M")),
+        null,
         null
     );
 
@@ -421,6 +463,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2000/P1M")),
+        null,
         null
     );
 
@@ -456,6 +499,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2000/P1M")),
+        null,
         null
     );
 
@@ -496,6 +540,7 @@ public class TableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         DATASOURCE,
         ImmutableList.of(Intervals.of("2000/P1M")),
+        null,
         null
     );
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
index 10aba22939b..2313063ac36 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
@@ -43,7 +43,27 @@ public class TableInputSpecTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         "myds",
         Collections.singletonList(Intervals.of("2000/P1M")),
-        new SelectorDimFilter("dim", "val", null)
+        new SelectorDimFilter("dim", "val", null),
+        Collections.singleton("dim")
+    );
+
+    Assert.assertEquals(
+        spec,
+        mapper.readValue(mapper.writeValueAsString(spec), InputSpec.class)
+    );
+  }
+
+  @Test
+  public void testSerdeEmptyFilterFields() throws Exception
+  {
+    final ObjectMapper mapper = TestHelper.makeJsonMapper()
+                                          .registerModules(new 
MSQIndexingModule().getJacksonModules());
+
+    final TableInputSpec spec = new TableInputSpec(
+        "myds",
+        Collections.singletonList(Intervals.of("2000/P1M")),
+        new SelectorDimFilter("dim", "val", null),
+        Collections.emptySet()
     );
 
     Assert.assertEquals(
@@ -61,7 +81,8 @@ public class TableInputSpecTest extends 
InitializedNullHandlingTest
     final TableInputSpec spec = new TableInputSpec(
         "myds",
         Intervals.ONLY_ETERNITY,
-        new SelectorDimFilter("dim", "val", null)
+        new SelectorDimFilter("dim", "val", null),
+        null
     );
 
     Assert.assertEquals(
diff --git 
a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java 
b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
index ed03efac38a..090ab26d5e2 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
@@ -21,10 +21,13 @@ package org.apache.druid.query.filter;
 
 import com.google.common.base.Function;
 import com.google.common.collect.RangeSet;
+import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.timeline.partition.ShardSpec;
 
+import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -86,26 +89,6 @@ public class DimFilterUtils
     return retVal.array();
   }
 
-  /**
-   * Filter the given iterable of objects by removing any object whose 
ShardSpec, obtained from the converter function,
-   * does not fit in the RangeSet of the dimFilter {@link 
DimFilter#getDimensionRangeSet(String)}. The returned set
-   * contains the filtered objects in the same order as they appear in input.
-   *
-   * If you plan to call this multiple times with the same dimFilter, consider 
using
-   * {@link #filterShards(DimFilter, Iterable, Function, Map)} instead with a 
cached map
-   *
-   * @param dimFilter The filter to use
-   * @param input     The iterable of objects to be filtered
-   * @param converter The function to convert T to ShardSpec that can be 
filtered by
-   * @param <T>       This can be any type, as long as transform function is 
provided to convert this to ShardSpec
-   *
-   * @return The set of filtered object, in the same order as input
-   */
-  public static <T> Set<T> filterShards(DimFilter dimFilter, Iterable<T> 
input, Function<T, ShardSpec> converter)
-  {
-    return filterShards(dimFilter, input, converter, new HashMap<>());
-  }
-
   /**
    * Filter the given iterable of objects by removing any object whose 
ShardSpec, obtained from the converter function,
    * does not fit in the RangeSet of the dimFilter {@link 
DimFilter#getDimensionRangeSet(String)}. The returned set
@@ -116,6 +99,7 @@ public class DimFilterUtils
    * on same dimensions.
    *
    * @param dimFilter           The filter to use
+   * @param filterFields        Set of fields to consider for pruning, or null 
to consider all fields
    * @param input               The iterable of objects to be filtered
    * @param converter           The function to convert T to ShardSpec that 
can be filtered by
    * @param dimensionRangeCache The cache of RangeSets of different dimensions 
for the dimFilter
@@ -124,7 +108,8 @@ public class DimFilterUtils
    * @return The set of filtered object, in the same order as input
    */
   public static <T> Set<T> filterShards(
-      final DimFilter dimFilter,
+      @Nullable final DimFilter dimFilter,
+      @Nullable final Set<String> filterFields,
       final Iterable<T> input,
       final Function<T, ShardSpec> converter,
       final Map<String, Optional<RangeSet<String>>> dimensionRangeCache
@@ -140,11 +125,13 @@ public class DimFilterUtils
         Map<String, RangeSet<String>> filterDomain = new HashMap<>();
         List<String> dimensions = shard.getDomainDimensions();
         for (String dimension : dimensions) {
-          Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
-              .computeIfAbsent(dimension, d -> 
Optional.ofNullable(dimFilter.getDimensionRangeSet(d)));
+          if (filterFields == null || filterFields.contains(dimension)) {
+            Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
+                .computeIfAbsent(dimension, d -> 
Optional.ofNullable(dimFilter.getDimensionRangeSet(d)));
 
-          if (optFilterRangeSet.isPresent()) {
-            filterDomain.put(dimension, optFilterRangeSet.get());
+            if (optFilterRangeSet.isPresent()) {
+              filterDomain.put(dimension, optFilterRangeSet.get());
+            }
           }
         }
         if (!filterDomain.isEmpty() && !shard.possibleInDomain(filterDomain)) {
@@ -158,4 +145,26 @@ public class DimFilterUtils
     }
     return retSet;
   }
+
+  /**
+   * Returns a copy of "fields" only including base fields from {@link 
DataSourceAnalysis}.
+   *
+   * @param fields             field list, must be nonnull
+   * @param dataSourceAnalysis analyzed datasource
+   */
+  public static Set<String> onlyBaseFields(
+      final Set<String> fields,
+      final DataSourceAnalysis dataSourceAnalysis
+  )
+  {
+    final Set<String> retVal = new HashSet<>();
+
+    for (final String field : fields) {
+      if (dataSourceAnalysis.isBaseColumn(field)) {
+        retVal.add(field);
+      }
+    }
+
+    return retVal;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
 
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
index f17ab6aec23..8d3f741087f 100644
--- 
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
+++ 
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
@@ -29,6 +29,7 @@ import org.apache.druid.query.UnionDataSource;
 import org.apache.druid.query.UnnestDataSource;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.join.JoinPrefixUtils;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -247,6 +248,24 @@ public class DataSourceAnalysis
     return !preJoinableClauses.isEmpty();
   }
 
+  /**
+   * Returns whether "column" on the analyzed datasource refers to a column 
from the base datasource.
+   */
+  public boolean isBaseColumn(final String column)
+  {
+    if (baseQuery != null) {
+      return false;
+    }
+
+    for (final PreJoinableClause clause : preJoinableClauses) {
+      if (JoinPrefixUtils.isPrefixedBy(column, clause.getPrefix())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
 
b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
index 778db21b0a3..13e15891d2f 100644
--- 
a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
@@ -31,6 +31,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -73,18 +74,31 @@ public class DimFilterUtilsTest
     EasyMock.replay(filter1, shard1, shard2, shard3, shard4, shard5, shard6, 
shard7);
 
     Set<ShardSpec> expected1 = ImmutableSet.of(shard1, shard4, shard5, shard6, 
shard7);
-    assertFilterResult(filter1, shards, expected1);
+    assertFilterResult(filter1, null, shards, expected1);
+    assertFilterResult(filter1, Collections.singleton("dim1"), shards, 
expected1);
+    assertFilterResult(filter1, Collections.singleton("dim2"), shards, 
ImmutableSet.copyOf(shards));
+    assertFilterResult(filter1, Collections.emptySet(), shards, 
ImmutableSet.copyOf(shards));
   }
 
-  private void assertFilterResult(DimFilter filter, Iterable<ShardSpec> input, 
Set<ShardSpec> expected)
+  private void assertFilterResult(
+      DimFilter filter,
+      Set<String> filterFields,
+      Iterable<ShardSpec> input,
+      Set<ShardSpec> expected
+  )
   {
-    Set<ShardSpec> result = DimFilterUtils.filterShards(filter, input, 
CONVERTER);
-    Assert.assertEquals(expected, result);
-
+    Set<ShardSpec> result = new HashSet<>();
     Map<String, Optional<RangeSet<String>>> dimensionRangeMap = new 
HashMap<>();
-    result = new HashSet<>();
     for (ShardSpec shard : input) {
-      result.addAll(DimFilterUtils.filterShards(filter, 
ImmutableList.of(shard), CONVERTER, dimensionRangeMap));
+      result.addAll(
+          DimFilterUtils.filterShards(
+              filter,
+              filterFields,
+              ImmutableList.of(shard),
+              CONVERTER,
+              dimensionRangeMap
+          )
+      );
     }
     Assert.assertEquals(expected, result);
   }
diff --git 
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
 
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
index 1302e504dc9..1240115221d 100644
--- 
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
@@ -74,6 +74,7 @@ public class DataSourceAnalysisTest
     Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
     Assert.assertEquals(Collections.emptyList(), 
analysis.getPreJoinableClauses());
     Assert.assertFalse(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
   }
 
   @Test
@@ -92,6 +93,7 @@ public class DataSourceAnalysisTest
     Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
     Assert.assertEquals(Collections.emptyList(), 
analysis.getPreJoinableClauses());
     Assert.assertFalse(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
   }
 
   @Test
@@ -113,6 +115,7 @@ public class DataSourceAnalysisTest
     );
     Assert.assertEquals(Collections.emptyList(), 
analysis.getPreJoinableClauses());
     Assert.assertFalse(analysis.isJoin());
+    Assert.assertFalse(analysis.isBaseColumn("foo"));
   }
 
   @Test
@@ -135,6 +138,7 @@ public class DataSourceAnalysisTest
     );
     Assert.assertEquals(Collections.emptyList(), 
analysis.getPreJoinableClauses());
     Assert.assertFalse(analysis.isJoin());
+    Assert.assertFalse(analysis.isBaseColumn("foo"));
   }
 
   @Test
@@ -152,6 +156,7 @@ public class DataSourceAnalysisTest
     Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
     Assert.assertEquals(Collections.emptyList(), 
analysis.getPreJoinableClauses());
     Assert.assertFalse(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
   }
 
   @Test
@@ -173,6 +178,7 @@ public class DataSourceAnalysisTest
     );
     Assert.assertEquals(Collections.emptyList(), 
analysis.getPreJoinableClauses());
     Assert.assertFalse(analysis.isJoin());
+    Assert.assertFalse(analysis.isBaseColumn("foo"));
   }
 
   @Test
@@ -190,6 +196,7 @@ public class DataSourceAnalysisTest
     Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
     Assert.assertEquals(Collections.emptyList(), 
analysis.getPreJoinableClauses());
     Assert.assertFalse(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
   }
 
   @Test
@@ -237,6 +244,10 @@ public class DataSourceAnalysisTest
         analysis.getPreJoinableClauses()
     );
     Assert.assertTrue(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
+    Assert.assertFalse(analysis.isBaseColumn("1.foo"));
+    Assert.assertFalse(analysis.isBaseColumn("2.foo"));
+    Assert.assertFalse(analysis.isBaseColumn("3.foo"));
   }
 
   @Test
@@ -282,6 +293,10 @@ public class DataSourceAnalysisTest
         analysis.getPreJoinableClauses()
     );
     Assert.assertTrue(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
+    Assert.assertFalse(analysis.isBaseColumn("1.foo"));
+    Assert.assertFalse(analysis.isBaseColumn("2.foo"));
+    Assert.assertFalse(analysis.isBaseColumn("3.foo"));
   }
 
   @Test
@@ -331,6 +346,10 @@ public class DataSourceAnalysisTest
         analysis.getPreJoinableClauses()
     );
     Assert.assertTrue(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
+    Assert.assertTrue(analysis.isBaseColumn("1.foo"));
+    Assert.assertTrue(analysis.isBaseColumn("2.foo"));
+    Assert.assertFalse(analysis.isBaseColumn("3.foo"));
   }
 
   @Test
@@ -376,6 +395,10 @@ public class DataSourceAnalysisTest
         analysis.getPreJoinableClauses()
     );
     Assert.assertTrue(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
+    Assert.assertTrue(analysis.isBaseColumn("1.foo"));
+    Assert.assertTrue(analysis.isBaseColumn("2.foo"));
+    Assert.assertFalse(analysis.isBaseColumn("3.foo"));
   }
 
   @Test
@@ -405,6 +428,8 @@ public class DataSourceAnalysisTest
         analysis.getPreJoinableClauses()
     );
     Assert.assertTrue(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
+    Assert.assertFalse(analysis.isBaseColumn("1.foo"));
   }
 
   @Test
@@ -436,6 +461,8 @@ public class DataSourceAnalysisTest
         analysis.getPreJoinableClauses()
     );
     Assert.assertTrue(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
+    Assert.assertFalse(analysis.isBaseColumn("1.foo"));
   }
 
   @Test
@@ -488,6 +515,8 @@ public class DataSourceAnalysisTest
         analysis.getPreJoinableClauses()
     );
     Assert.assertTrue(analysis.isJoin());
+    Assert.assertFalse(analysis.isBaseColumn("foo"));
+    Assert.assertFalse(analysis.isBaseColumn("1.foo"));
   }
 
   @Test
@@ -518,6 +547,8 @@ public class DataSourceAnalysisTest
         analysis.getPreJoinableClauses()
     );
     Assert.assertTrue(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
+    Assert.assertFalse(analysis.isBaseColumn("1.foo"));
   }
 
   @Test
@@ -548,6 +579,8 @@ public class DataSourceAnalysisTest
         analysis.getPreJoinableClauses()
     );
     Assert.assertTrue(analysis.isJoin());
+    Assert.assertTrue(analysis.isBaseColumn("foo"));
+    Assert.assertFalse(analysis.isBaseColumn("1.foo"));
   }
 
   @Test
diff --git 
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java 
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 19df276344e..892e39e9007 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -436,19 +436,34 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
       );
 
       final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
-      final Map<String, Optional<RangeSet<String>>> dimensionRangeCache = new 
HashMap<>();
+      final Map<String, Optional<RangeSet<String>>> dimensionRangeCache;
+      final Set<String> filterFieldsForPruning;
+
+      final boolean trySecondaryPartititionPruning =
+          query.getFilter() != null && 
query.context().isSecondaryPartitionPruningEnabled();
+
+      if (trySecondaryPartititionPruning) {
+        dimensionRangeCache = new HashMap<>();
+        filterFieldsForPruning =
+            
DimFilterUtils.onlyBaseFields(query.getFilter().getRequiredColumns(), 
dataSourceAnalysis);
+      } else {
+        dimensionRangeCache = null;
+        filterFieldsForPruning = null;
+      }
+
       // Filter unneeded chunks based on partition dimension
       for (TimelineObjectHolder<String, ServerSelector> holder : 
serversLookup) {
         final Set<PartitionChunk<ServerSelector>> filteredChunks;
-        if (query.context().isSecondaryPartitionPruningEnabled()) {
+        if (trySecondaryPartititionPruning) {
           filteredChunks = DimFilterUtils.filterShards(
               query.getFilter(),
+              filterFieldsForPruning,
               holder.getObject(),
               partitionChunk -> 
partitionChunk.getObject().getSegment().getShardSpec(),
               dimensionRangeCache
           );
         } else {
-          filteredChunks = Sets.newHashSet(holder.getObject());
+          filteredChunks = Sets.newLinkedHashSet(holder.getObject());
         }
         for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
           ServerSelector server = chunk.getObject();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to