This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 98f1eb8ede5 Use filters for pruning properly for hash-joins. (#15299)
98f1eb8ede5 is described below
commit 98f1eb8ede58400416049db925faee287e3fcc74
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Nov 3 07:29:16 2023 -0700
Use filters for pruning properly for hash-joins. (#15299)
* Use filters for pruning properly for hash-joins.
Native used them too aggressively: it might use filters for the RHS
to prune the LHS. MSQ used them not at all. Now, both use them properly,
pruning based on base (LHS) columns only.
* Fix tests.
* Fix style.
* Clear filterFields too.
* Update.
---
.../druid/msq/input/table/TableInputSpec.java | 42 ++++++++----
.../msq/input/table/TableInputSpecSlicer.java | 5 +-
.../apache/druid/msq/querykit/DataSourcePlan.java | 75 ++++++++++++++++++++--
.../msq/querykit/groupby/GroupByQueryKit.java | 1 +
.../druid/msq/querykit/scan/ScanQueryKit.java | 1 +
.../msq/input/table/TableInputSpecSlicerTest.java | 59 +++++++++++++++--
.../druid/msq/input/table/TableInputSpecTest.java | 25 +++++++-
.../apache/druid/query/filter/DimFilterUtils.java | 59 +++++++++--------
.../druid/query/planning/DataSourceAnalysis.java | 19 ++++++
.../druid/query/filter/DimFilterUtilsTest.java | 28 ++++++--
.../query/planning/DataSourceAnalysisTest.java | 33 ++++++++++
.../druid/client/CachingClusteredClient.java | 21 +++++-
12 files changed, 307 insertions(+), 61 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
index b3337f4307c..2691d827f89 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
@@ -32,6 +32,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
/**
* Input spec representing a Druid table.
@@ -45,28 +46,35 @@ public class TableInputSpec implements InputSpec
@Nullable
private final DimFilter filter;
+ @Nullable
+ private final Set<String> filterFields;
+
/**
* Create a table input spec.
*
- * @param dataSource datasource to read
- * @param intervals intervals to filter, or null if no time filtering is
desired. Interval filtering is strict,
- * meaning that when this spec is sliced and read, the
returned {@link SegmentWithDescriptor}
- * from {@link ReadableInput#getSegment()} are clipped to
these intervals.
- * @param filter other filters to use for pruning, or null if no pruning
is desired. Pruning filters are
- * *not strict*, which means that processors must re-apply
them when processing the returned
- * {@link SegmentWithDescriptor} from {@link
ReadableInput#getSegment()}. This matches how
- * Broker-based pruning works for native queries.
+ * @param dataSource datasource to read
+ * @param intervals intervals to filter, or null if no time filtering is
desired. Interval filtering is strict,
+ * meaning that when this spec is sliced and read, the
returned {@link SegmentWithDescriptor}
+ * from {@link ReadableInput#getSegment()} are clipped
to these intervals.
+ * @param filter other filters to use for pruning, or null if no
pruning is desired. Pruning filters are
+ * *not strict*, which means that processors must
re-apply them when processing the returned
+ * {@link SegmentWithDescriptor} from {@link
ReadableInput#getSegment()}. This matches how
+ * Broker-based pruning works for native queries.
+ * @param filterFields list of fields from {@link
DimFilter#getRequiredColumns()} to consider for pruning. If null,
+ * all fields are considered for pruning.
*/
@JsonCreator
public TableInputSpec(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") @Nullable List<Interval> intervals,
- @JsonProperty("filter") @Nullable DimFilter filter
+ @JsonProperty("filter") @Nullable DimFilter filter,
+ @JsonProperty("filterFields") @Nullable Set<String> filterFields
)
{
this.dataSource = dataSource;
this.intervals = intervals == null ? Intervals.ONLY_ETERNITY : intervals;
this.filter = filter;
+ this.filterFields = filterFields;
}
@JsonProperty
@@ -97,6 +105,14 @@ public class TableInputSpec implements InputSpec
return filter;
}
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public Set<String> getFilterFields()
+ {
+ return filterFields;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -109,13 +125,14 @@ public class TableInputSpec implements InputSpec
TableInputSpec that = (TableInputSpec) o;
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(intervals, that.intervals)
- && Objects.equals(filter, that.filter);
+ && Objects.equals(filter, that.filter)
+ && Objects.equals(filterFields, that.filterFields);
}
@Override
public int hashCode()
{
- return Objects.hash(dataSource, intervals, filter);
+ return Objects.hash(dataSource, intervals, filter, filterFields);
}
@Override
@@ -124,7 +141,8 @@ public class TableInputSpec implements InputSpec
return "TableInputSpec{" +
"dataSource='" + dataSource + '\'' +
", intervals=" + intervals +
- ", filter=" + filter +
+ (filter == null ? "" : ", filter=" + filter) +
+ (filterFields == null ? "" : ", filterFields=" + filterFields) +
'}';
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
index 91f2e681e1e..1cd82f726ed 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
@@ -33,6 +33,7 @@ import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -115,8 +116,10 @@ public class TableInputSpecSlicer implements
InputSpecSlicer
return DimFilterUtils.filterShards(
tableInputSpec.getFilter(),
+ tableInputSpec.getFilterFields(),
() -> dataSegmentIterator,
- segment -> segment.getSegment().getShardSpec()
+ segment -> segment.getSegment().getShardSpec(),
+ new HashMap<>()
);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index 16eaef63c49..566b084ad36 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -54,6 +54,7 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@@ -74,8 +75,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
+/**
+ * Plan for getting data from a {@link DataSource}. Used by {@link QueryKit}
implementations.
+ */
public class DataSourcePlan
{
/**
@@ -116,6 +121,22 @@ public class DataSourcePlan
}
}
+ /**
+ * Build a plan.
+ *
+ * @param queryKit query kit reference for recursive planning
+ * @param queryId query ID
+ * @param queryContext query context
+ * @param dataSource datasource to plan
+ * @param querySegmentSpec intervals for mandatory pruning. Must be {@link
MultipleIntervalSegmentSpec}. The returned
+ * plan is guaranteed to be filtered to this
interval.
+ * @param filter filter for best-effort pruning. The returned plan
may or may not be filtered to this
+ * filter. Query processing must still apply the
filter to generated correct results.
+ * @param filterFields which fields from the filter to consider for
pruning, or null to consider all fields.
+ * @param maxWorkerCount maximum number of workers for subqueries
+ * @param minStageNumber starting stage number for subqueries
+ * @param broadcast whether the plan should broadcast data for this
datasource
+ */
@SuppressWarnings("rawtypes")
public static DataSourcePlan forDataSource(
final QueryKit queryKit,
@@ -124,13 +145,32 @@ public class DataSourcePlan
final DataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable DimFilter filter,
+ @Nullable Set<String> filterFields,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
+ if (!queryContext.isSecondaryPartitionPruningEnabled()) {
+ // Clear filter, we don't want to prune today.
+ filter = null;
+ filterFields = null;
+ }
+
+ if (filter != null && filterFields == null) {
+ // Ensure filterFields is nonnull if filter is nonnull. Helps for other
forXYZ methods, so they don't need to
+ // deal with the case where filter is nonnull but filterFields is null.
+ filterFields = filter.getRequiredColumns();
+ }
+
if (dataSource instanceof TableDataSource) {
- return forTable((TableDataSource) dataSource,
querySegmentSpecIntervals(querySegmentSpec), filter, broadcast);
+ return forTable(
+ (TableDataSource) dataSource,
+ querySegmentSpecIntervals(querySegmentSpec),
+ filter,
+ filterFields,
+ broadcast
+ );
} else if (dataSource instanceof ExternalDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forExternal((ExternalDataSource) dataSource, broadcast);
@@ -179,6 +219,7 @@ public class DataSourcePlan
(UnionDataSource) dataSource,
querySegmentSpec,
filter,
+ filterFields,
maxWorkerCount,
minStageNumber,
broadcast
@@ -198,6 +239,8 @@ public class DataSourcePlan
queryContext,
(JoinDataSource) dataSource,
querySegmentSpec,
+ filter,
+ filterFields,
maxWorkerCount,
minStageNumber,
broadcast
@@ -222,21 +265,36 @@ public class DataSourcePlan
}
}
+ /**
+ * Possibly remapped datasource that should be used when processing. Will be
either the original datasource, or the
+ * original datasource with itself or some children replaced by {@link
InputNumberDataSource}. Any added
+ * {@link InputNumberDataSource} refer to {@link StageInputSpec} in {@link
#getInputSpecs()}.
+ */
public DataSource getNewDataSource()
{
return newDataSource;
}
+ /**
+ * Input specs that should be used when processing.
+ */
public List<InputSpec> getInputSpecs()
{
return inputSpecs;
}
+ /**
+ * Which input specs from {@link #getInputSpecs()} are broadcast.
+ */
public IntSet getBroadcastInputs()
{
return broadcastInputs;
}
+ /**
+ * Returns a {@link QueryDefinitionBuilder} that includes any {@link
StageInputSpec} from {@link #getInputSpecs()}.
+ * Absent if this plan does not involve reading from prior stages.
+ */
public Optional<QueryDefinitionBuilder> getSubQueryDefBuilder()
{
return Optional.ofNullable(subQueryDefBuilder);
@@ -302,12 +360,13 @@ public class DataSourcePlan
final TableDataSource dataSource,
final List<Interval> intervals,
@Nullable final DimFilter filter,
+ @Nullable final Set<String> filterFields,
final boolean broadcast
)
{
return new DataSourcePlan(
(broadcast && dataSource.isGlobal()) ? dataSource : new
InputNumberDataSource(0),
- Collections.singletonList(new TableInputSpec(dataSource.getName(),
intervals, filter)),
+ Collections.singletonList(new TableInputSpec(dataSource.getName(),
intervals, filter, filterFields)),
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
null
);
@@ -407,6 +466,7 @@ public class DataSourcePlan
dataSource.getBase(),
querySegmentSpec,
null,
+ null,
maxWorkerCount,
minStageNumber,
broadcast
@@ -447,6 +507,7 @@ public class DataSourcePlan
dataSource.getBase(),
querySegmentSpec,
null,
+ null,
maxWorkerCount,
minStageNumber,
broadcast
@@ -478,6 +539,7 @@ public class DataSourcePlan
final UnionDataSource unionDataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable DimFilter filter,
+ @Nullable Set<String> filterFields,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
@@ -499,6 +561,7 @@ public class DataSourcePlan
child,
querySegmentSpec,
filter,
+ filterFields,
maxWorkerCount,
Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()),
broadcast
@@ -528,6 +591,8 @@ public class DataSourcePlan
final QueryContext queryContext,
final JoinDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
+ @Nullable final DimFilter filter,
+ @Nullable final Set<String> filterFields,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
@@ -542,7 +607,8 @@ public class DataSourcePlan
queryContext,
analysis.getBaseDataSource(),
querySegmentSpec,
- null, // Don't push query filters down through a join: this needs some
work to ensure pruning works properly.
+ filter,
+ filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields,
analysis),
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
broadcast
@@ -561,7 +627,8 @@ public class DataSourcePlan
queryContext,
clause.getDataSource(),
new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY),
- null, // Don't push query filters down through a join: this needs
some work to ensure pruning works properly.
+ null, // Don't push down query filters for right-hand side: needs
some work to ensure it works properly.
+ null,
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
true // Always broadcast right-hand side of the join.
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index b12048a315f..469a8a8aa46 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -84,6 +84,7 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
originalQuery.getFilter(),
+ null,
maxWorkerCount,
minStageNumber,
false
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
index 681d0ae9c00..986554f86c8 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
@@ -95,6 +95,7 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
originalQuery.getFilter(),
+ null,
maxWorkerCount,
minStageNumber,
false
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java
index fd5db7e75f6..67d5040d447 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java
@@ -112,13 +112,13 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
@Test
public void test_canSliceDynamic()
{
- Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE,
null, null)));
+ Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE,
null, null, null)));
}
@Test
public void test_sliceStatic_noDataSource()
{
- final TableInputSpec spec = new TableInputSpec("no such datasource", null,
null);
+ final TableInputSpec spec = new TableInputSpec("no such datasource", null,
null, null);
Assert.assertEquals(
ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
slicer.sliceStatic(spec, 2)
@@ -134,6 +134,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
Intervals.of("2000/P1M"),
Intervals.of("2000-06-01/P1M")
),
+ null,
null
);
@@ -183,6 +184,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
Collections.singletonList(Intervals.of("2002/P1M")),
+ null,
null
);
@@ -198,7 +200,8 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
null,
- new SelectorDimFilter("dim", "bar", null)
+ new SelectorDimFilter("dim", "bar", null),
+ null
);
Assert.assertEquals(
@@ -221,6 +224,42 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
);
}
+ @Test
+ public void test_sliceStatic_dimFilterNotUsed()
+ {
+ final TableInputSpec spec = new TableInputSpec(
+ DATASOURCE,
+ null,
+ new SelectorDimFilter("dim", "bar", null),
+ Collections.emptySet()
+ );
+
+ Assert.assertEquals(
+ ImmutableList.of(
+ new SegmentsInputSlice(
+ DATASOURCE,
+ ImmutableList.of(
+ new RichSegmentDescriptor(
+ SEGMENT1.getInterval(),
+ SEGMENT1.getInterval(),
+ SEGMENT1.getVersion(),
+ SEGMENT1.getShardSpec().getPartitionNum(),
+ null
+ ),
+ new RichSegmentDescriptor(
+ SEGMENT2.getInterval(),
+ SEGMENT2.getInterval(),
+ SEGMENT2.getVersion(),
+ SEGMENT2.getShardSpec().getPartitionNum(),
+ null
+ )
+ )
+ )
+ ),
+ slicer.sliceStatic(spec, 1)
+ );
+ }
+
@Test
public void test_sliceStatic_intervalAndDimFilter()
{
@@ -230,7 +269,8 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
Intervals.of("2000/P1M"),
Intervals.of("2000-06-01/P1M")
),
- new SelectorDimFilter("dim", "bar", null)
+ new SelectorDimFilter("dim", "bar", null),
+ null
);
Assert.assertEquals(
@@ -267,7 +307,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
@Test
public void test_sliceStatic_oneSlice()
{
- final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
+ final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null,
null);
Assert.assertEquals(
Collections.singletonList(
new SegmentsInputSlice(
@@ -297,7 +337,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
@Test
public void test_sliceStatic_needTwoSlices()
{
- final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
+ final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null,
null);
Assert.assertEquals(
ImmutableList.of(
new SegmentsInputSlice(
@@ -332,7 +372,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
@Test
public void test_sliceStatic_threeSlices()
{
- final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
+ final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null,
null);
Assert.assertEquals(
ImmutableList.of(
new SegmentsInputSlice(
@@ -371,6 +411,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2002/P1M")),
+ null,
null
);
@@ -386,6 +427,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2000/P1M")),
+ null,
null
);
@@ -421,6 +463,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2000/P1M")),
+ null,
null
);
@@ -456,6 +499,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2000/P1M")),
+ null,
null
);
@@ -496,6 +540,7 @@ public class TableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2000/P1M")),
+ null,
null
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
index 10aba22939b..2313063ac36 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
@@ -43,7 +43,27 @@ public class TableInputSpecTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
"myds",
Collections.singletonList(Intervals.of("2000/P1M")),
- new SelectorDimFilter("dim", "val", null)
+ new SelectorDimFilter("dim", "val", null),
+ Collections.singleton("dim")
+ );
+
+ Assert.assertEquals(
+ spec,
+ mapper.readValue(mapper.writeValueAsString(spec), InputSpec.class)
+ );
+ }
+
+ @Test
+ public void testSerdeEmptyFilterFields() throws Exception
+ {
+ final ObjectMapper mapper = TestHelper.makeJsonMapper()
+ .registerModules(new
MSQIndexingModule().getJacksonModules());
+
+ final TableInputSpec spec = new TableInputSpec(
+ "myds",
+ Collections.singletonList(Intervals.of("2000/P1M")),
+ new SelectorDimFilter("dim", "val", null),
+ Collections.emptySet()
);
Assert.assertEquals(
@@ -61,7 +81,8 @@ public class TableInputSpecTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
"myds",
Intervals.ONLY_ETERNITY,
- new SelectorDimFilter("dim", "val", null)
+ new SelectorDimFilter("dim", "val", null),
+ null
);
Assert.assertEquals(
diff --git
a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
index ed03efac38a..090ab26d5e2 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
@@ -21,10 +21,13 @@ package org.apache.druid.query.filter;
import com.google.common.base.Function;
import com.google.common.collect.RangeSet;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.timeline.partition.ShardSpec;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -86,26 +89,6 @@ public class DimFilterUtils
return retVal.array();
}
- /**
- * Filter the given iterable of objects by removing any object whose
ShardSpec, obtained from the converter function,
- * does not fit in the RangeSet of the dimFilter {@link
DimFilter#getDimensionRangeSet(String)}. The returned set
- * contains the filtered objects in the same order as they appear in input.
- *
- * If you plan to call this multiple times with the same dimFilter, consider
using
- * {@link #filterShards(DimFilter, Iterable, Function, Map)} instead with a
cached map
- *
- * @param dimFilter The filter to use
- * @param input The iterable of objects to be filtered
- * @param converter The function to convert T to ShardSpec that can be
filtered by
- * @param <T> This can be any type, as long as transform function is
provided to convert this to ShardSpec
- *
- * @return The set of filtered object, in the same order as input
- */
- public static <T> Set<T> filterShards(DimFilter dimFilter, Iterable<T>
input, Function<T, ShardSpec> converter)
- {
- return filterShards(dimFilter, input, converter, new HashMap<>());
- }
-
/**
* Filter the given iterable of objects by removing any object whose
ShardSpec, obtained from the converter function,
* does not fit in the RangeSet of the dimFilter {@link
DimFilter#getDimensionRangeSet(String)}. The returned set
@@ -116,6 +99,7 @@ public class DimFilterUtils
* on same dimensions.
*
* @param dimFilter The filter to use
+ * @param filterFields Set of fields to consider for pruning, or null
to consider all fields
* @param input The iterable of objects to be filtered
* @param converter The function to convert T to ShardSpec that
can be filtered by
* @param dimensionRangeCache The cache of RangeSets of different dimensions
for the dimFilter
@@ -124,7 +108,8 @@ public class DimFilterUtils
* @return The set of filtered object, in the same order as input
*/
public static <T> Set<T> filterShards(
- final DimFilter dimFilter,
+ @Nullable final DimFilter dimFilter,
+ @Nullable final Set<String> filterFields,
final Iterable<T> input,
final Function<T, ShardSpec> converter,
final Map<String, Optional<RangeSet<String>>> dimensionRangeCache
@@ -140,11 +125,13 @@ public class DimFilterUtils
Map<String, RangeSet<String>> filterDomain = new HashMap<>();
List<String> dimensions = shard.getDomainDimensions();
for (String dimension : dimensions) {
- Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
- .computeIfAbsent(dimension, d ->
Optional.ofNullable(dimFilter.getDimensionRangeSet(d)));
+ if (filterFields == null || filterFields.contains(dimension)) {
+ Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
+ .computeIfAbsent(dimension, d ->
Optional.ofNullable(dimFilter.getDimensionRangeSet(d)));
- if (optFilterRangeSet.isPresent()) {
- filterDomain.put(dimension, optFilterRangeSet.get());
+ if (optFilterRangeSet.isPresent()) {
+ filterDomain.put(dimension, optFilterRangeSet.get());
+ }
}
}
if (!filterDomain.isEmpty() && !shard.possibleInDomain(filterDomain)) {
@@ -158,4 +145,26 @@ public class DimFilterUtils
}
return retSet;
}
+
+ /**
+ * Returns a copy of "fields" only including base fields from {@link
DataSourceAnalysis}.
+ *
+ * @param fields field list, must be nonnull
+ * @param dataSourceAnalysis analyzed datasource
+ */
+ public static Set<String> onlyBaseFields(
+ final Set<String> fields,
+ final DataSourceAnalysis dataSourceAnalysis
+ )
+ {
+ final Set<String> retVal = new HashSet<>();
+
+ for (final String field : fields) {
+ if (dataSourceAnalysis.isBaseColumn(field)) {
+ retVal.add(field);
+ }
+ }
+
+ return retVal;
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
index f17ab6aec23..8d3f741087f 100644
---
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
+++
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
@@ -29,6 +29,7 @@ import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.join.JoinPrefixUtils;
import javax.annotation.Nullable;
import java.util.List;
@@ -247,6 +248,24 @@ public class DataSourceAnalysis
return !preJoinableClauses.isEmpty();
}
+ /**
+ * Returns whether "column" on the analyzed datasource refers to a column
from the base datasource.
+ */
+ public boolean isBaseColumn(final String column)
+ {
+ if (baseQuery != null) {
+ return false;
+ }
+
+ for (final PreJoinableClause clause : preJoinableClauses) {
+ if (JoinPrefixUtils.isPrefixedBy(column, clause.getPrefix())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
index 778db21b0a3..13e15891d2f 100644
---
a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
@@ -31,6 +31,7 @@ import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -73,18 +74,31 @@ public class DimFilterUtilsTest
EasyMock.replay(filter1, shard1, shard2, shard3, shard4, shard5, shard6,
shard7);
Set<ShardSpec> expected1 = ImmutableSet.of(shard1, shard4, shard5, shard6,
shard7);
- assertFilterResult(filter1, shards, expected1);
+ assertFilterResult(filter1, null, shards, expected1);
+ assertFilterResult(filter1, Collections.singleton("dim1"), shards,
expected1);
+ assertFilterResult(filter1, Collections.singleton("dim2"), shards,
ImmutableSet.copyOf(shards));
+ assertFilterResult(filter1, Collections.emptySet(), shards,
ImmutableSet.copyOf(shards));
}
- private void assertFilterResult(DimFilter filter, Iterable<ShardSpec> input,
Set<ShardSpec> expected)
+ private void assertFilterResult(
+ DimFilter filter,
+ Set<String> filterFields,
+ Iterable<ShardSpec> input,
+ Set<ShardSpec> expected
+ )
{
- Set<ShardSpec> result = DimFilterUtils.filterShards(filter, input,
CONVERTER);
- Assert.assertEquals(expected, result);
-
+ Set<ShardSpec> result = new HashSet<>();
Map<String, Optional<RangeSet<String>>> dimensionRangeMap = new
HashMap<>();
- result = new HashSet<>();
for (ShardSpec shard : input) {
- result.addAll(DimFilterUtils.filterShards(filter,
ImmutableList.of(shard), CONVERTER, dimensionRangeMap));
+ result.addAll(
+ DimFilterUtils.filterShards(
+ filter,
+ filterFields,
+ ImmutableList.of(shard),
+ CONVERTER,
+ dimensionRangeMap
+ )
+ );
}
Assert.assertEquals(expected, result);
}
diff --git
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
index 1302e504dc9..1240115221d 100644
---
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
+++
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
@@ -74,6 +74,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(),
analysis.getPreJoinableClauses());
Assert.assertFalse(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
}
@Test
@@ -92,6 +93,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(),
analysis.getPreJoinableClauses());
Assert.assertFalse(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
}
@Test
@@ -113,6 +115,7 @@ public class DataSourceAnalysisTest
);
Assert.assertEquals(Collections.emptyList(),
analysis.getPreJoinableClauses());
Assert.assertFalse(analysis.isJoin());
+ Assert.assertFalse(analysis.isBaseColumn("foo"));
}
@Test
@@ -135,6 +138,7 @@ public class DataSourceAnalysisTest
);
Assert.assertEquals(Collections.emptyList(),
analysis.getPreJoinableClauses());
Assert.assertFalse(analysis.isJoin());
+ Assert.assertFalse(analysis.isBaseColumn("foo"));
}
@Test
@@ -152,6 +156,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(),
analysis.getPreJoinableClauses());
Assert.assertFalse(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
}
@Test
@@ -173,6 +178,7 @@ public class DataSourceAnalysisTest
);
Assert.assertEquals(Collections.emptyList(),
analysis.getPreJoinableClauses());
Assert.assertFalse(analysis.isJoin());
+ Assert.assertFalse(analysis.isBaseColumn("foo"));
}
@Test
@@ -190,6 +196,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(),
analysis.getPreJoinableClauses());
Assert.assertFalse(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
}
@Test
@@ -237,6 +244,10 @@ public class DataSourceAnalysisTest
analysis.getPreJoinableClauses()
);
Assert.assertTrue(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ Assert.assertFalse(analysis.isBaseColumn("1.foo"));
+ Assert.assertFalse(analysis.isBaseColumn("2.foo"));
+ Assert.assertFalse(analysis.isBaseColumn("3.foo"));
}
@Test
@@ -282,6 +293,10 @@ public class DataSourceAnalysisTest
analysis.getPreJoinableClauses()
);
Assert.assertTrue(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ Assert.assertFalse(analysis.isBaseColumn("1.foo"));
+ Assert.assertFalse(analysis.isBaseColumn("2.foo"));
+ Assert.assertFalse(analysis.isBaseColumn("3.foo"));
}
@Test
@@ -331,6 +346,10 @@ public class DataSourceAnalysisTest
analysis.getPreJoinableClauses()
);
Assert.assertTrue(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ Assert.assertTrue(analysis.isBaseColumn("1.foo"));
+ Assert.assertTrue(analysis.isBaseColumn("2.foo"));
+ Assert.assertFalse(analysis.isBaseColumn("3.foo"));
}
@Test
@@ -376,6 +395,10 @@ public class DataSourceAnalysisTest
analysis.getPreJoinableClauses()
);
Assert.assertTrue(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ Assert.assertTrue(analysis.isBaseColumn("1.foo"));
+ Assert.assertTrue(analysis.isBaseColumn("2.foo"));
+ Assert.assertFalse(analysis.isBaseColumn("3.foo"));
}
@Test
@@ -405,6 +428,8 @@ public class DataSourceAnalysisTest
analysis.getPreJoinableClauses()
);
Assert.assertTrue(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ Assert.assertFalse(analysis.isBaseColumn("1.foo"));
}
@Test
@@ -436,6 +461,8 @@ public class DataSourceAnalysisTest
analysis.getPreJoinableClauses()
);
Assert.assertTrue(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ Assert.assertFalse(analysis.isBaseColumn("1.foo"));
}
@Test
@@ -488,6 +515,8 @@ public class DataSourceAnalysisTest
analysis.getPreJoinableClauses()
);
Assert.assertTrue(analysis.isJoin());
+ Assert.assertFalse(analysis.isBaseColumn("foo"));
+ Assert.assertFalse(analysis.isBaseColumn("1.foo"));
}
@Test
@@ -518,6 +547,8 @@ public class DataSourceAnalysisTest
analysis.getPreJoinableClauses()
);
Assert.assertTrue(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ Assert.assertFalse(analysis.isBaseColumn("1.foo"));
}
@Test
@@ -548,6 +579,8 @@ public class DataSourceAnalysisTest
analysis.getPreJoinableClauses()
);
Assert.assertTrue(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ Assert.assertFalse(analysis.isBaseColumn("1.foo"));
}
@Test
diff --git
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 19df276344e..892e39e9007 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -436,19 +436,34 @@ public class CachingClusteredClient implements
QuerySegmentWalker
);
final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
- final Map<String, Optional<RangeSet<String>>> dimensionRangeCache = new
HashMap<>();
+ final Map<String, Optional<RangeSet<String>>> dimensionRangeCache;
+ final Set<String> filterFieldsForPruning;
+
+ final boolean trySecondaryPartititionPruning =
+ query.getFilter() != null &&
query.context().isSecondaryPartitionPruningEnabled();
+
+ if (trySecondaryPartititionPruning) {
+ dimensionRangeCache = new HashMap<>();
+ filterFieldsForPruning =
+
DimFilterUtils.onlyBaseFields(query.getFilter().getRequiredColumns(),
dataSourceAnalysis);
+ } else {
+ dimensionRangeCache = null;
+ filterFieldsForPruning = null;
+ }
+
// Filter unneeded chunks based on partition dimension
for (TimelineObjectHolder<String, ServerSelector> holder :
serversLookup) {
final Set<PartitionChunk<ServerSelector>> filteredChunks;
- if (query.context().isSecondaryPartitionPruningEnabled()) {
+ if (trySecondaryPartititionPruning) {
filteredChunks = DimFilterUtils.filterShards(
query.getFilter(),
+ filterFieldsForPruning,
holder.getObject(),
partitionChunk ->
partitionChunk.getObject().getSegment().getShardSpec(),
dimensionRangeCache
);
} else {
- filteredChunks = Sets.newHashSet(holder.getObject());
+ filteredChunks = Sets.newLinkedHashSet(holder.getObject());
}
for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
ServerSelector server = chunk.getObject();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]