This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new cd6bbd53ee4 introduce SegmentPruner interface to abstract segment
pruning operations (#19042)
cd6bbd53ee4 is described below
commit cd6bbd53ee42388ee86678e44b11bafc002687e1
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Feb 24 09:14:39 2026 -0800
introduce SegmentPruner interface to abstract segment pruning operations
(#19042)
changes:
* add new `SegmentPruner` interface as a replacement for
`DimFilterUtils.filterShards` static method
* `SegmentPruner` interface has a single 'prune' method that takes an
`Iterable<T>` and `Function` to extract a `DataSegment` from the iterable
objects, returning a `Collection<T>` of the filtered objects
* `DimFilterUtils.filterShards` has been removed, its logic has been
migrated to `FilterSegmentPruner` implementation of the new `SegmentPruner`
interface
* `TableInputSpec` now has a `SegmentPruner` field instead of separate
`filter` and `filterFields` (these fields still remain for compat purposes)
* transition `DartTableInputSpecSlicer`, `IndexerTableInputSpecSlicer, and
`CachingClusteredClient` to use `SegmentPruner` (the former 2 via the
`TableInputSpec`)
---
docs/api-reference/sql-api.md | 9 -
.../dart/controller/DartTableInputSpecSlicer.java | 30 +-
.../org/apache/druid/msq/exec/StageProcessor.java | 12 +
.../msq/indexing/IndexerTableInputSpecSlicer.java | 26 +-
.../org/apache/druid/msq/input/InputSlice.java | 2 +-
.../apache/druid/msq/input/InputSpecSlicer.java | 15 +-
.../apache/druid/msq/input/MapInputSpecSlicer.java | 9 +-
.../input/external/ExternalInputSpecSlicer.java | 9 +-
.../msq/input/inline/InlineInputSpecSlicer.java | 5 +-
.../msq/input/lookup/LookupInputSpecSlicer.java | 5 +-
.../msq/input/stage/StageInputSpecSlicer.java | 5 +-
.../druid/msq/input/table/TableInputSpec.java | 59 +---
.../apache/druid/msq/kernel/StageDefinition.java | 11 +
.../druid/msq/kernel/WorkerAssignmentStrategy.java | 10 +-
.../druid/msq/kernel/controller/WorkerInputs.java | 5 +-
.../apache/druid/msq/logical/stages/ReadStage.java | 2 +-
.../druid/msq/querykit/BaseLeafStageProcessor.java | 17 +-
.../apache/druid/msq/querykit/DataSourcePlan.java | 53 +---
.../druid/msq/querykit/WindowOperatorQueryKit.java | 2 -
.../msq/querykit/groupby/GroupByQueryKit.java | 2 -
.../druid/msq/querykit/scan/ScanQueryKit.java | 2 -
.../controller/DartTableInputSpecSlicerTest.java | 35 +--
.../org/apache/druid/msq/input/InputSpecsTest.java | 12 +-
.../external/ExternalInputSpecSlicerTest.java | 28 +-
.../msq/input/stage/StageInputSpecSlicerTest.java | 8 +-
.../table/IndexerTableInputSpecSlicerTest.java | 81 +++---
.../druid/msq/input/table/TableInputSpecTest.java | 43 +--
.../controller/ControllerTestInputSpecSlicer.java | 5 +-
.../msq/kernel/controller/WorkerInputsTest.java | 9 +-
.../msq1.iq | 9 +-
.../msqJoinHint.iq | 27 +-
.../msqNestedJoinHint.iq | 9 +-
.../apache/druid/query/filter/DimFilterUtils.java | 95 -------
.../druid/query/filter/FilterSegmentPruner.java | 136 +++++++++
.../apache/druid/query/filter/SegmentPruner.java | 24 +-
.../druid/query/planning/ExecutionVertex.java | 304 +++++++++++----------
.../druid/query/filter/DimFilterUtilsTest.java | 126 ---------
.../query/filter/FilterSegmentPrunerTest.java | 126 +++++++++
.../qaWin/sql_explain.msq.iq | 36 +--
.../druid/client/CachingClusteredClient.java | 30 +-
web-console/src/druid-models/stages/stages.mock.ts | 7 -
web-console/src/druid-models/stages/stages.ts | 2 -
.../execution-stages-pane.tsx | 2 -
43 files changed, 675 insertions(+), 769 deletions(-)
diff --git a/docs/api-reference/sql-api.md b/docs/api-reference/sql-api.md
index af60cee4c80..37b79e83c3b 100644
--- a/docs/api-reference/sql-api.md
+++ b/docs/api-reference/sql-api.md
@@ -911,15 +911,6 @@ Host: http://ROUTER_IP:ROUTER_PORT
"dataSource": "wikipedia",
"intervals": [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
- ],
- "filter": {
- "type": "equals",
- "column": "user",
- "matchValueType": "STRING",
- "matchValue": "BlueMoon2662"
- },
- "filterFields": [
- "user"
]
}
],
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
index 4b7ea2ed8ee..a9b4bc6b070 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.dart.controller;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.client.QueryableDruidServer;
@@ -44,13 +45,15 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.CloneQueryMode;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.filter.DimFilterUtils;
+import org.apache.druid.query.filter.SegmentPruner;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -122,7 +125,11 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
}
@Override
- public List<InputSlice> sliceStatic(final InputSpec inputSpec, final int
maxNumSlices)
+ public List<InputSlice> sliceStatic(
+ final InputSpec inputSpec,
+ @Nullable final SegmentPruner segmentPruner,
+ final int maxNumSlices
+ )
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
final TimelineLookup<String, ServerSelector> timeline =
@@ -132,9 +139,10 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
return Collections.emptyList();
}
- final Set<DartQueryableSegment> prunedSegments =
+ final Collection<DartQueryableSegment> prunedSegments =
findQueryableDataSegments(
tableInputSpec,
+ segmentPruner,
timeline,
serverSelector -> findWorkerForServerSelector(serverSelector,
maxNumSlices)
);
@@ -183,6 +191,7 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
@Override
public List<InputSlice> sliceDynamic(
final InputSpec inputSpec,
+ @Nullable final SegmentPruner segmentPruner,
final int maxNumSlices,
final int maxFilesPerSlice,
final long maxBytesPerSlice
@@ -222,8 +231,9 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
* Pull the list of {@link DataSegment} that we should query, along with a
clipping interval for each one, and
* a worker to get it from.
*/
- private Set<DartQueryableSegment> findQueryableDataSegments(
+ private Collection<DartQueryableSegment> findQueryableDataSegments(
final TableInputSpec tableInputSpec,
+ @Nullable final SegmentPruner segmentPruner,
final TimelineLookup<?, ServerSelector> timeline,
final ToIntFunction<ServerSelector> toWorkersFunction
)
@@ -242,14 +252,10 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
})
.filter(segment ->
!segment.getSegment().isTombstone())
);
-
- return DimFilterUtils.filterShards(
- tableInputSpec.getFilter(),
- tableInputSpec.getFilterFields(),
- allSegments,
- segment -> segment.getSegment().getShardSpec(),
- new HashMap<>()
- );
+ if (segmentPruner == null) {
+ return ImmutableSet.copyOf(allSegments);
+ }
+ return segmentPruner.prune(allSegments, DartQueryableSegment::getSegment);
}
private DartQueryableSegment toDartQueryableSegment(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/StageProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/StageProcessor.java
index 0b6ff41f12c..da47511e848 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/StageProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/StageProcessor.java
@@ -24,7 +24,9 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.msq.indexing.processor.SegmentGeneratorStageProcessor;
+import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.query.filter.SegmentPruner;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
@@ -77,4 +79,14 @@ public interface StageProcessor<R, ExtraInfoType>
*/
@SuppressWarnings("rawtypes")
ExtraInfoHolder makeExtraInfoHolder(@Nullable ExtraInfoType extra);
+
+ /**
+ * Produces an optional {@link SegmentPruner}, which can used for
best-effort pruning of {@link DataSegment} objects
+ * by processors which deal with them
+ */
+ @Nullable
+ default SegmentPruner getPruner(InputSpec inputSpec, int inputNumber)
+ {
+ return null;
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java
index a7b8cb6b0a8..ddbfdde9854 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
@@ -44,7 +45,7 @@ import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.filter.DimFilterUtils;
+import org.apache.druid.query.filter.SegmentPruner;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
@@ -96,14 +97,14 @@ public class IndexerTableInputSpecSlicer implements
InputSpecSlicer
}
@Override
- public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+ public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable
SegmentPruner segmentPruner, int maxNumSlices)
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
final List<WeightedInputInstance> prunedPublishedSegments = new
ArrayList<>();
final List<DataSegmentWithInterval> prunedServedSegments = new
ArrayList<>();
- for (DataSegmentWithInterval dataSegmentWithInterval :
getPrunedSegmentSet(tableInputSpec)) {
+ for (DataSegmentWithInterval dataSegmentWithInterval :
getPrunedSegmentSet(tableInputSpec, segmentPruner)) {
if (dataSegmentWithInterval.segment instanceof DataSegmentWithLocation) {
prunedServedSegments.add(dataSegmentWithInterval);
} else {
@@ -125,6 +126,7 @@ public class IndexerTableInputSpecSlicer implements
InputSpecSlicer
@Override
public List<InputSlice> sliceDynamic(
InputSpec inputSpec,
+ @Nullable SegmentPruner segmentPruner,
int maxNumSlices,
int maxFilesPerSlice,
long maxBytesPerSlice
@@ -135,7 +137,7 @@ public class IndexerTableInputSpecSlicer implements
InputSpecSlicer
final List<WeightedInputInstance> prunedSegments = new ArrayList<>();
final List<DataSegmentWithInterval> prunedServedSegments = new
ArrayList<>();
- for (DataSegmentWithInterval dataSegmentWithInterval :
getPrunedSegmentSet(tableInputSpec)) {
+ for (DataSegmentWithInterval dataSegmentWithInterval :
getPrunedSegmentSet(tableInputSpec, segmentPruner)) {
if (dataSegmentWithInterval.segment instanceof DataSegmentWithLocation) {
prunedServedSegments.add(dataSegmentWithInterval);
} else {
@@ -157,7 +159,10 @@ public class IndexerTableInputSpecSlicer implements
InputSpecSlicer
return makeSlices(tableInputSpec, assignments);
}
- private Set<DataSegmentWithInterval> getPrunedSegmentSet(final
TableInputSpec tableInputSpec)
+ private Collection<DataSegmentWithInterval> getPrunedSegmentSet(
+ final TableInputSpec tableInputSpec,
+ @Nullable final SegmentPruner segmentPruner
+ )
{
final TimelineLookup<String, DataSegment> timeline =
getTimeline(tableInputSpec.getDataSource(),
tableInputSpec.getIntervals());
@@ -182,13 +187,10 @@ public class IndexerTableInputSpecSlicer implements
InputSpecSlicer
.map(segment -> new
DataSegmentWithInterval(segment, holder.getInterval()))
).iterator();
- return DimFilterUtils.filterShards(
- tableInputSpec.getFilter(),
- tableInputSpec.getFilterFields(),
- () -> dataSegmentIterator,
- segment -> segment.getSegment().getShardSpec(),
- new HashMap<>()
- );
+ if (segmentPruner == null) {
+ return ImmutableSet.copyOf(dataSegmentIterator);
+ }
+ return segmentPruner.prune(() -> dataSegmentIterator,
DataSegmentWithInterval::getSegment);
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
index 90a6970485e..ccd08bc1f9b 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
* Slice of an {@link InputSpec} assigned to a particular worker.
*
* On the controller, these are produced using {@link InputSpecSlicer}. On
workers, these are transformed into
- * {@link ReadableSlice} using {@link InputSliceReader}.
+ * {@link PhysicalInputSlice} using {@link InputSliceReader}.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface InputSlice
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicer.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicer.java
index ff1808e463c..4e29cd7eb61 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicer.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicer.java
@@ -19,6 +19,9 @@
package org.apache.druid.msq.input;
+import org.apache.druid.query.filter.SegmentPruner;
+
+import javax.annotation.Nullable;
import java.util.List;
/**
@@ -28,7 +31,7 @@ import java.util.List;
public interface InputSpecSlicer
{
/**
- * Whether {@link #sliceDynamic(InputSpec, int, int, long)} is usable for a
given {@link InputSpec}.
+ * Whether {@link #sliceDynamic(InputSpec, SegmentPruner, int, int, long)}
is usable for a given {@link InputSpec}.
*/
boolean canSliceDynamic(InputSpec inputSpec);
@@ -39,7 +42,7 @@ public interface InputSpecSlicer
* This method creates as many slices as possible while staying at or under
maxNumSlices. For example, if a spec
* contains 8 files, and maxNumSlices is 10, then 8 slices will be created.
*/
- List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices);
+ List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable SegmentPruner
segmentPruner, int maxNumSlices);
/**
* Slice a spec based on a particular maximum number of files and bytes per
slice.
@@ -58,5 +61,11 @@ public interface InputSpecSlicer
*
* @throws UnsupportedOperationException if {@link
#canSliceDynamic(InputSpec)} returns false
*/
- List<InputSlice> sliceDynamic(InputSpec inputSpec, int maxNumSlices, int
maxFilesPerSlice, long maxBytesPerSlice);
+ List<InputSlice> sliceDynamic(
+ InputSpec inputSpec,
+ @Nullable SegmentPruner segmentPruner,
+ int maxNumSlices,
+ int maxFilesPerSlice,
+ long maxBytesPerSlice
+ );
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/MapInputSpecSlicer.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/MapInputSpecSlicer.java
index f26a0459c89..9677e0bad96 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/MapInputSpecSlicer.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/MapInputSpecSlicer.java
@@ -21,7 +21,9 @@ package org.apache.druid.msq.input;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.filter.SegmentPruner;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@@ -44,20 +46,21 @@ public class MapInputSpecSlicer implements InputSpecSlicer
}
@Override
- public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+ public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable
SegmentPruner segmentPruner, int maxNumSlices)
{
- return getSlicer(inputSpec.getClass()).sliceStatic(inputSpec,
maxNumSlices);
+ return getSlicer(inputSpec.getClass()).sliceStatic(inputSpec,
segmentPruner, maxNumSlices);
}
@Override
public List<InputSlice> sliceDynamic(
InputSpec inputSpec,
+ @Nullable SegmentPruner segmentPruner,
int maxNumSlices,
int maxFilesPerSlice,
long maxBytesPerSlice
)
{
- return getSlicer(inputSpec.getClass()).sliceDynamic(inputSpec,
maxNumSlices, maxFilesPerSlice, maxBytesPerSlice);
+ return getSlicer(inputSpec.getClass()).sliceDynamic(inputSpec,
segmentPruner, maxNumSlices, maxFilesPerSlice, maxBytesPerSlice);
}
private InputSpecSlicer getSlicer(final Class<? extends InputSpec> clazz)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
index 7c6700ce3a2..8db9d57324e 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
@@ -31,7 +31,9 @@ import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.input.SlicerUtils;
+import org.apache.druid.query.filter.SegmentPruner;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -52,7 +54,7 @@ public class ExternalInputSpecSlicer implements
InputSpecSlicer
}
@Override
- public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+ public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable
SegmentPruner segmentPruner, int maxNumSlices)
{
final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
@@ -70,6 +72,7 @@ public class ExternalInputSpecSlicer implements
InputSpecSlicer
@Override
public List<InputSlice> sliceDynamic(
final InputSpec inputSpec,
+ @Nullable final SegmentPruner segmentPruner,
final int maxNumSlices,
final int maxFilesPerSlice,
final long maxBytesPerSlice
@@ -176,7 +179,7 @@ public class ExternalInputSpecSlicer implements
InputSpecSlicer
}
/**
- * Split hint spec used by {@link #sliceStatic(InputSpec, int)}.
+ * Split hint spec used by {@link #sliceStatic(InputSpec, SegmentPruner,
int)}.
*/
static class StaticSplitHintSpec implements SplitHintSpec
{
@@ -205,7 +208,7 @@ public class ExternalInputSpecSlicer implements
InputSpecSlicer
}
/**
- * Split hint spec used by {@link #sliceDynamic(InputSpec, int, int, long)}.
+ * Split hint spec used by {@link #sliceDynamic(InputSpec, SegmentPruner,
int, int, long)}.
*/
static class DynamicSplitHintSpec implements SplitHintSpec
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSpecSlicer.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSpecSlicer.java
index d72482df4e9..7bc7f14205a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSpecSlicer.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSpecSlicer.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.input.inline;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
@@ -38,7 +40,7 @@ public class InlineInputSpecSlicer implements InputSpecSlicer
}
@Override
- public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+ public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable
SegmentPruner segmentPruner, int maxNumSlices)
{
return Collections.singletonList(new InlineInputSlice(((InlineInputSpec)
inputSpec).getDataSource()));
}
@@ -46,6 +48,7 @@ public class InlineInputSpecSlicer implements InputSpecSlicer
@Override
public List<InputSlice> sliceDynamic(
InputSpec inputSpec,
+ @Nullable SegmentPruner segmentPruner,
int maxNumSlices,
int maxFilesPerSlice,
long maxBytesPerSlice
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSpecSlicer.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSpecSlicer.java
index 27c4a7f2f24..8fe7770cc38 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSpecSlicer.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSpecSlicer.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.input.lookup;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
@@ -38,7 +40,7 @@ public class LookupInputSpecSlicer implements InputSpecSlicer
}
@Override
- public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+ public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable
SegmentPruner segmentPruner, int maxNumSlices)
{
return Collections.singletonList(new LookupInputSlice(((LookupInputSpec)
inputSpec).getLookupName()));
}
@@ -46,6 +48,7 @@ public class LookupInputSpecSlicer implements InputSpecSlicer
@Override
public List<InputSlice> sliceDynamic(
InputSpec inputSpec,
+ @Nullable SegmentPruner segmentPruner,
int maxNumSlices,
int maxFilesPerSlice,
long maxBytesPerSlice
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StageInputSpecSlicer.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StageInputSpecSlicer.java
index f3b5d23ae4f..74b092a12e5 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StageInputSpecSlicer.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StageInputSpecSlicer.java
@@ -25,7 +25,9 @@ import org.apache.druid.msq.exec.OutputChannelMode;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
@@ -56,7 +58,7 @@ public class StageInputSpecSlicer implements InputSpecSlicer
}
@Override
- public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+ public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable
SegmentPruner segmentPruner, int maxNumSlices)
{
final StageInputSpec stageInputSpec = (StageInputSpec) inputSpec;
@@ -91,6 +93,7 @@ public class StageInputSpecSlicer implements InputSpecSlicer
@Override
public List<InputSlice> sliceDynamic(
InputSpec inputSpec,
+ @Nullable SegmentPruner segmentPruner,
int maxNumSlices,
int maxFilesPerSlice,
long maxBytesPerSlice
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
index 697c65599a6..a7e88578110 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java
@@ -29,13 +29,11 @@ import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.LoadableSegment;
import org.apache.druid.msq.input.PhysicalInputSlice;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.filter.DimFilter;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
/**
* Input spec representing a Druid table.
@@ -45,16 +43,9 @@ public class TableInputSpec implements InputSpec
{
private final String dataSource;
private final List<Interval> intervals;
-
@Nullable
private final List<SegmentDescriptor> segments;
- @Nullable
- private final DimFilter filter;
-
- @Nullable
- private final Set<String> filterFields;
-
/**
* Create a table input spec.
*
@@ -65,20 +56,12 @@ public class TableInputSpec implements InputSpec
* {@link LoadableSegment#descriptor()}.
* @param segments specific segments to read, or null to read all
segments in the intervals. If provided,
* only these segments will be read. Must not be empty
if non-null.
- * @param filter other filters to use for pruning, or null if no
pruning is desired. Pruning filters are
- * *not strict*, which means that processors must
re-apply them when processing the returned
- * {@link LoadableSegment} from {@link
PhysicalInputSlice#getLoadableSegments()}. This matches how
- * Broker-based pruning works for native queries.
- * @param filterFields list of fields from {@link
DimFilter#getRequiredColumns()} to consider for pruning. If null,
- * all fields are considered for pruning.
*/
@JsonCreator
public TableInputSpec(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") @Nullable List<Interval> intervals,
- @JsonProperty("segments") @Nullable List<SegmentDescriptor> segments,
- @JsonProperty("filter") @Nullable DimFilter filter,
- @JsonProperty("filterFields") @Nullable Set<String> filterFields
+ @JsonProperty("segments") @Nullable List<SegmentDescriptor> segments
)
{
this.dataSource = dataSource;
@@ -87,22 +70,6 @@ public class TableInputSpec implements InputSpec
throw new IAE("Can not supply empty segments as input, please use either
null or non-empty segments.");
}
this.segments = segments;
- this.filter = filter;
- this.filterFields = filterFields;
- }
-
- /**
- * @deprecated Use {@link #TableInputSpec(String, List, List, DimFilter,
Set)} with explicit null for segments instead.
- */
- @Deprecated
- public TableInputSpec(
- String dataSource,
- @Nullable List<Interval> intervals,
- @Nullable DimFilter filter,
- @Nullable Set<String> filterFields
- )
- {
- this(dataSource, intervals, null, filter, filterFields);
}
@JsonProperty
@@ -133,22 +100,6 @@ public class TableInputSpec implements InputSpec
return segments;
}
- @JsonProperty
- @JsonInclude(JsonInclude.Include.NON_NULL)
- @Nullable
- public DimFilter getFilter()
- {
- return filter;
- }
-
- @JsonProperty
- @JsonInclude(JsonInclude.Include.NON_NULL)
- @Nullable
- public Set<String> getFilterFields()
- {
- return filterFields;
- }
-
@Override
public boolean equals(Object o)
{
@@ -161,15 +112,13 @@ public class TableInputSpec implements InputSpec
TableInputSpec that = (TableInputSpec) o;
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(intervals, that.intervals)
- && Objects.equals(segments, that.segments)
- && Objects.equals(filter, that.filter)
- && Objects.equals(filterFields, that.filterFields);
+ && Objects.equals(segments, that.segments);
}
@Override
public int hashCode()
{
- return Objects.hash(dataSource, intervals, segments, filter, filterFields);
+ return Objects.hash(dataSource, intervals, segments);
}
@Override
@@ -179,8 +128,6 @@ public class TableInputSpec implements InputSpec
"dataSource='" + dataSource + '\'' +
", intervals=" + intervals +
(segments == null ? "" : ", segments=" + segments) +
- (filter == null ? "" : ", filter=" + filter) +
- (filterFields == null ? "" : ", filterFields=" + filterFields) +
'}';
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 736195155c6..f836a357d3c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -50,6 +50,7 @@ import
org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.filter.SegmentPruner;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@@ -231,6 +232,16 @@ public class StageDefinition
}
}
+ /**
+ * Get a {@link SegmentPruner} from the {@link StageProcessor} for a given
'input number' from {@link #inputSpecs}.
+ * This can be used to best-effort prune the set of {@link
org.apache.druid.timeline.DataSegment} to process in order
+ * to reduce the working set before processing begins
+ */
+ public SegmentPruner getSegmentPruner(int inputNumber)
+ {
+ return processor.getPruner(inputSpecs.get(inputNumber), inputNumber);
+ }
+
/**
* Returns the {@link ShuffleSpec} for this stage, if {@link #doesShuffle()}.
*
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
index 0c9115ec437..4f9847bad0c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
@@ -27,7 +27,9 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.OptionalInt;
@@ -48,11 +50,12 @@ public enum WorkerAssignmentStrategy
final InputSpec inputSpec,
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
+ @Nullable final SegmentPruner segmentPruner,
final int maxInputFilesPerSlice,
final long maxInputBytesPerSlice
)
{
- return slicer.sliceStatic(inputSpec, stageDef.getMaxWorkerCount());
+ return slicer.sliceStatic(inputSpec, segmentPruner,
stageDef.getMaxWorkerCount());
}
},
@@ -69,6 +72,7 @@ public enum WorkerAssignmentStrategy
final InputSpec inputSpec,
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
+ @Nullable final SegmentPruner segmentPruner,
final int maxInputFilesPerSlice,
final long maxInputBytesPerSlice
)
@@ -76,6 +80,7 @@ public enum WorkerAssignmentStrategy
if (slicer.canSliceDynamic(inputSpec)) {
return slicer.sliceDynamic(
inputSpec,
+ segmentPruner,
stageDef.getMaxWorkerCount(),
maxInputFilesPerSlice,
maxInputBytesPerSlice
@@ -92,7 +97,7 @@ public enum WorkerAssignmentStrategy
final IntSet inputStages = stageDef.getInputStageNumbers();
final OptionalInt maxInputStageWorkerCount =
inputStages.intStream().map(stageWorkerCountMap).max();
final int workerCount = Math.min(stageDef.getMaxWorkerCount(),
maxInputStageWorkerCount.orElse(1));
- return slicer.sliceStatic(inputSpec, workerCount);
+ return slicer.sliceStatic(inputSpec, segmentPruner, workerCount);
}
}
};
@@ -127,6 +132,7 @@ public enum WorkerAssignmentStrategy
InputSpec inputSpec,
Int2IntMap stageWorkerCountMap,
InputSpecSlicer slicer,
+ @Nullable SegmentPruner segmentPruner,
int maxInputFilesPerSlice,
long maxInputBytesPerSlice
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
index 6e1f00b1c84..ef825ed69d3 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
@@ -34,6 +34,7 @@ import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.query.filter.SegmentPruner;
import java.util.Arrays;
import java.util.Collections;
@@ -79,10 +80,11 @@ public class WorkerInputs
// Assign input slices to workers.
for (int inputNumber = 0; inputNumber < numInputs; inputNumber++) {
final InputSpec inputSpec = stageDef.getInputSpecs().get(inputNumber);
+ final SegmentPruner pruner = stageDef.getSegmentPruner(inputNumber);
if (stageDef.getBroadcastInputNumbers().contains(inputNumber)) {
// Broadcast case: send everything everywhere.
- final List<InputSlice> broadcastSlices = slicer.sliceStatic(inputSpec,
1);
+ final List<InputSlice> broadcastSlices = slicer.sliceStatic(inputSpec,
pruner, 1);
final InputSlice broadcastSlice = broadcastSlices.isEmpty()
? NilInputSlice.INSTANCE
:
Iterables.getOnlyElement(broadcastSlices);
@@ -100,6 +102,7 @@ public class WorkerInputs
inputSpec,
stageWorkerCountMap,
slicer,
+ pruner,
maxInputFilesPerWorker,
maxInputBytesPerWorker
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
index a4eaf099dfe..5ff11bbf723 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
@@ -139,7 +139,7 @@ public class ReadStage extends AbstractFrameProcessorStage
{
if (dataSource instanceof TableDataSource) {
TableDataSource ids = (TableDataSource) dataSource;
- TableInputSpec inputSpec = new TableInputSpec(ids.getName(),
Intervals.ONLY_ETERNITY, null, null, null);
+ TableInputSpec inputSpec = new TableInputSpec(ids.getName(),
Intervals.ONLY_ETERNITY, null);
return inputSpec;
}
if (dataSource instanceof InlineDataSource) {
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
index 148eea2b174..357829cdd29 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
@@ -19,6 +19,8 @@
package org.apache.druid.msq.querykit;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.ListenableFuture;
@@ -43,6 +45,7 @@ import org.apache.druid.msq.exec.std.ProcessorsAndChannels;
import org.apache.druid.msq.exec.std.StandardPartitionReader;
import org.apache.druid.msq.exec.std.StandardStageRunner;
import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.PhysicalInputSlice;
import org.apache.druid.msq.input.external.ExternalInputSlice;
import org.apache.druid.msq.input.stage.StageInputSlice;
@@ -50,6 +53,7 @@ import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
+import org.apache.druid.query.filter.SegmentPruner;
import org.apache.druid.query.planning.ExecutionVertex;
import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.utils.CollectionUtils;
@@ -70,10 +74,12 @@ import java.util.function.Function;
public abstract class BaseLeafStageProcessor extends BasicStageProcessor
{
private final Query<?> query;
+ private final Supplier<ExecutionVertex> executionVertexSupplier;
protected BaseLeafStageProcessor(Query<?> query)
{
this.query = query;
+ this.executionVertexSupplier = Suppliers.memoize(() ->
ExecutionVertex.of(query));
}
@Override
@@ -153,7 +159,7 @@ public abstract class BaseLeafStageProcessor extends
BasicStageProcessor
final ProcessorManager processorManager;
if (segmentMapFnProcessor == null) {
- final SegmentMapFunction segmentMapFn =
ExecutionVertex.of(query).createSegmentMapFunction(frameContext.policyEnforcer());
+ final SegmentMapFunction segmentMapFn =
executionVertexSupplier.get().createSegmentMapFunction(frameContext.policyEnforcer());
processorManager =
processorManagerFn.apply(ImmutableList.of(segmentMapFn));
} else {
processorManager = new ChainedProcessorManager<>(
@@ -171,6 +177,13 @@ public abstract class BaseLeafStageProcessor extends
BasicStageProcessor
);
}
+ @Nullable
+ @Override
+ public SegmentPruner getPruner(InputSpec inputSpec, int inputNumber)
+ {
+ return executionVertexSupplier.get().getSegmentPruner();
+ }
+
private ProcessorManager<Object, Long>
createBaseLeafProcessorManagerWithHandoff(
final ExecutionContext context,
final ReadableInputQueue baseInputQueue,
@@ -337,7 +350,7 @@ public abstract class BaseLeafStageProcessor extends
BasicStageProcessor
final Int2ObjectMap<ReadableInput> broadcastInputs =
readBroadcastInputsFromEarlierStages(context);
if (broadcastInputs.isEmpty()) {
- if (ExecutionVertex.of(query).isSegmentMapFunctionExpensive()) {
+ if (executionVertexSupplier.get().isSegmentMapFunctionExpensive()) {
// Joins may require significant computation to compute the
segmentMapFn. Offload it to a processor.
return new SimpleSegmentMapFnProcessor(query,
context.frameContext().policyEnforcer());
} else {
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index 51850d7046e..bdc4370f9e7 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -57,8 +57,6 @@ import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
-import org.apache.druid.query.filter.DimFilter;
-import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.query.planning.JoinDataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@@ -79,7 +77,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -137,9 +134,6 @@ public class DataSourcePlan
* @param dataSource datasource to plan
* @param querySegmentSpec intervals for mandatory pruning. Must be {@link
MultipleIntervalSegmentSpec}. The returned
* plan is guaranteed to be filtered to this
interval.
- * @param filter filter for best-effort pruning. The returned plan
may or may not be filtered to this
- * filter. Query processing must still apply the
filter to generated correct results.
- * @param filterFields which fields from the filter to consider for
pruning, or null to consider all fields.
* @param minStageNumber starting stage number for subqueries
* @param broadcast whether the plan should broadcast data for this
datasource
*/
@@ -148,38 +142,21 @@ public class DataSourcePlan
final QueryContext queryContext,
final DataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
- @Nullable DimFilter filter,
- @Nullable Set<String> filterFields,
final int minStageNumber,
final boolean broadcast
)
{
- if (!queryContext.isSecondaryPartitionPruningEnabled()) {
- // Clear filter, we don't want to prune today.
- filter = null;
- filterFields = null;
- }
-
- if (filter != null && filterFields == null) {
- // Ensure filterFields is nonnull if filter is nonnull. Helps for other
forXYZ methods, so they don't need to
- // deal with the case where filter is nonnull but filterFields is null.
- filterFields = filter.getRequiredColumns();
- }
if (dataSource instanceof TableDataSource) {
return forTable(
(TableDataSource) dataSource,
querySegmentSpec,
- filter,
- filterFields,
broadcast
);
} else if (dataSource instanceof RestrictedDataSource) {
return forRestricted(
(RestrictedDataSource) dataSource,
querySegmentSpec,
- filter,
- filterFields,
broadcast
);
} else if (dataSource instanceof ExternalDataSource) {
@@ -222,8 +199,6 @@ public class DataSourcePlan
queryContext,
(UnionDataSource) dataSource,
querySegmentSpec,
- filter,
- filterFields,
minStageNumber,
broadcast
);
@@ -242,8 +217,6 @@ public class DataSourcePlan
queryContext,
joinDataSource,
querySegmentSpec,
- filter,
- filterFields,
minStageNumber,
broadcast
);
@@ -369,8 +342,6 @@ public class DataSourcePlan
private static DataSourcePlan forTable(
final TableDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
- @Nullable final DimFilter filter,
- @Nullable final Set<String> filterFields,
final boolean broadcast
)
{
@@ -385,7 +356,7 @@ public class DataSourcePlan
List<Interval> intervals = querySegmentSpec.getIntervals();
return new DataSourcePlan(
(broadcast && dataSource.isGlobal()) ? dataSource : new
InputNumberDataSource(0),
- List.of(new TableInputSpec(dataSource.getName(), intervals, segments,
filter, filterFields)),
+ List.of(new TableInputSpec(dataSource.getName(), intervals, segments)),
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
null
);
@@ -394,15 +365,13 @@ public class DataSourcePlan
private static DataSourcePlan forRestricted(
final RestrictedDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
- @Nullable final DimFilter filter,
- @Nullable final Set<String> filterFields,
final boolean broadcast
)
{
DataSource restricted = (broadcast && dataSource.isGlobal())
? dataSource
: new RestrictedInputNumberDataSource(0,
dataSource.getPolicy());
- return forTable(dataSource.getBase(), querySegmentSpec, filter,
filterFields, broadcast).withDataSource(restricted);
+ return forTable(dataSource.getBase(), querySegmentSpec,
broadcast).withDataSource(restricted);
}
private static DataSourcePlan forExternal(
@@ -490,8 +459,6 @@ public class DataSourcePlan
queryContext,
dataSource.getBase(),
querySegmentSpec,
- null,
- null,
minStageNumber,
broadcast
);
@@ -527,8 +494,6 @@ public class DataSourcePlan
queryContext,
dataSource.getBase(),
querySegmentSpec,
- null,
- null,
minStageNumber,
broadcast
);
@@ -557,8 +522,6 @@ public class DataSourcePlan
final QueryContext queryContext,
final UnionDataSource unionDataSource,
final QuerySegmentSpec querySegmentSpec,
- @Nullable DimFilter filter,
- @Nullable Set<String> filterFields,
final int minStageNumber,
final boolean broadcast
)
@@ -577,8 +540,6 @@ public class DataSourcePlan
queryContext,
child,
querySegmentSpec,
- filter,
- filterFields,
Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()),
broadcast
);
@@ -606,8 +567,6 @@ public class DataSourcePlan
final QueryContext queryContext,
final JoinDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
- @Nullable final DimFilter filter,
- @Nullable final Set<String> filterFields,
final int minStageNumber,
final boolean broadcast
)
@@ -620,8 +579,6 @@ public class DataSourcePlan
queryContext,
analysis.getBaseDataSource(),
querySegmentSpec,
- filter,
- filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields,
analysis::isBaseColumn),
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
broadcast
);
@@ -638,8 +595,6 @@ public class DataSourcePlan
queryContext,
clause.getDataSource(),
new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY),
- null, // Don't push down query filters for right-hand side: needs
some work to ensure it works properly.
- null,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
true // Always broadcast right-hand side of the join.
);
@@ -805,9 +760,7 @@ public class DataSourcePlan
* Verify that the provided {@link QuerySegmentSpec} is a {@link
MultipleIntervalSegmentSpec} with
* interval {@link Intervals#ETERNITY}. If not, throw an {@link
UnsupportedOperationException}.
* <p>
- * We don't need to support this for anything that is not {@link
DataSourceAnalysis#isTableBased()}, because
- * the SQL layer avoids "intervals" in other cases. See
- * {@link
org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}.
+ * See {@link
org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}.
*/
private static void checkQuerySegmentSpecIsEternity(
final DataSource dataSource,
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index 94ef7ac4cbf..7a2ec6f239a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -78,8 +78,6 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
originalQuery.context(),
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
- originalQuery.getFilter(),
- null,
minStageNumber,
false
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index 6dacc20c6ce..8ace680938c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -83,8 +83,6 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
originalQuery.context(),
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
- originalQuery.getFilter(),
- null,
minStageNumber,
false
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
index 95f3d200b33..f6cc9222df9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
@@ -88,8 +88,6 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
originalQuery.context(),
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
- originalQuery.getFilter(),
- null,
minStageNumber,
false
);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
index 2ed6fefbfe5..b18ee8dde53 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
@@ -45,6 +45,7 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.query.filter.FilterSegmentPruner;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
@@ -235,11 +236,11 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
{
// This slicer cannot sliceDynamic.
- final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null, null, null);
+ final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null);
Assertions.assertFalse(slicer.canSliceDynamic(inputSpec));
Assertions.assertThrows(
UnsupportedOperationException.class,
- () -> slicer.sliceDynamic(inputSpec, 1, 1, 1)
+ () -> slicer.sliceDynamic(inputSpec, null, 1, 1, 1)
);
}
@@ -249,8 +250,8 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
// When 1 slice is requested, all segments are assigned to one server,
even if that server doesn't actually
// currently serve those segments.
- final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null, null, null);
- final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 1);
+ final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null);
+ final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null,
1);
Assertions.assertEquals(
ImmutableList.of(
new SegmentsInputSlice(
@@ -305,8 +306,8 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
{
// When 2 slices are requested, we assign segments to the servers that
have those segments.
- final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null, null, null);
- final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
+ final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null);
+ final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null,
2);
Assertions.assertEquals(
ImmutableList.of(
new SegmentsInputSlice(
@@ -367,8 +368,8 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
{
// When 3 slices are requested, only 2 are returned, because we only have
two workers.
- final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null, null, null);
- final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 3);
+ final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null);
+ final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null,
3);
Assertions.assertEquals(
ImmutableList.of(
new SegmentsInputSlice(
@@ -428,8 +429,8 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
@Test
public void test_sliceStatic_nonexistentTable()
{
- final TableInputSpec inputSpec = new
TableInputSpec(DATASOURCE_NONEXISTENT, null, null, null, null);
- final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 1);
+ final TableInputSpec inputSpec = new
TableInputSpec(DATASOURCE_NONEXISTENT, null, null);
+ final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null,
1);
Assertions.assertEquals(
Collections.emptyList(),
inputSlices
@@ -444,12 +445,14 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec inputSpec = new TableInputSpec(
DATASOURCE,
null,
- null,
+ null
+ );
+ final FilterSegmentPruner pruner = new FilterSegmentPruner(
new EqualityFilter(PARTITION_DIM, ColumnType.STRING, "abc", null),
null
);
- final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
+ final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, pruner,
2);
Assertions.assertEquals(
ImmutableList.of(
@@ -507,12 +510,10 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec inputSpec = new TableInputSpec(
DATASOURCE,
Collections.singletonList(Intervals.of("2000/P1Y")),
- null,
- null,
null
);
- final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
+ final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null,
2);
Assertions.assertEquals(
ImmutableList.of(
@@ -553,8 +554,8 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
// When 2 slices are requested, we assign segments to the servers that
have those segments.
- final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null, null, null);
- final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
+ final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null);
+ final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, null,
2);
// Expect segment 2 and then the realtime segments 5 and 6 to be assigned
round-robin.
Assertions.assertEquals(
ImmutableList.of(
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java
index 1b7510fd136..84044c64b5b 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java
@@ -63,7 +63,7 @@ public class InputSpecsTest
{
Assert.assertFalse(
InputSpecs.hasLeafInputs(
- ImmutableList.of(new TableInputSpec("tbl", null, null, null,
null)),
+ ImmutableList.of(new TableInputSpec("tbl", null, null)),
IntSet.of(0)
)
);
@@ -75,7 +75,7 @@ public class InputSpecsTest
Assert.assertTrue(
InputSpecs.hasLeafInputs(
ImmutableList.of(
- new TableInputSpec("tbl", null, null, null, null),
+ new TableInputSpec("tbl", null, null),
new StageInputSpec(0)
),
IntSets.emptySet()
@@ -89,7 +89,7 @@ public class InputSpecsTest
Assert.assertTrue(
InputSpecs.hasLeafInputs(
ImmutableList.of(
- new TableInputSpec("tbl", null, null, null, null),
+ new TableInputSpec("tbl", null, null),
new StageInputSpec(0)
),
IntSet.of(1)
@@ -103,7 +103,7 @@ public class InputSpecsTest
Assert.assertFalse(
InputSpecs.hasLeafInputs(
ImmutableList.of(
- new TableInputSpec("tbl", null, null, null, null),
+ new TableInputSpec("tbl", null, null),
new StageInputSpec(0)
),
IntSet.of(0)
@@ -117,8 +117,8 @@ public class InputSpecsTest
Assert.assertTrue(
InputSpecs.hasLeafInputs(
ImmutableList.of(
- new TableInputSpec("tbl", null, null, null, null),
- new TableInputSpec("tbl2", null, null, null, null)
+ new TableInputSpec("tbl", null, null),
+ new TableInputSpec("tbl2", null, null)
),
IntSet.of(1)
)
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
index d8dff6fe082..17692d3fc7c 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
@@ -82,7 +82,7 @@ public class ExternalInputSpecSlicerTest
{
Assert.assertEquals(
ImmutableList.of(unsplittableSlice("foo", "bar", "baz")),
- slicer.sliceStatic(unsplittableSpec("foo", "bar", "baz"), 2)
+ slicer.sliceStatic(unsplittableSpec("foo", "bar", "baz"), null, 2)
);
}
@@ -91,7 +91,7 @@ public class ExternalInputSpecSlicerTest
{
Assert.assertEquals(
ImmutableList.of(unsplittableSlice()),
- slicer.sliceStatic(unsplittableSpec(), 2)
+ slicer.sliceStatic(unsplittableSpec(), null, 2)
);
}
@@ -103,7 +103,7 @@ public class ExternalInputSpecSlicerTest
splittableSlice("foo", "baz"),
splittableSlice("bar")
),
- slicer.sliceStatic(splittableSpec("foo", "bar", "baz"), 2)
+ slicer.sliceStatic(splittableSpec("foo", "bar", "baz"), null, 2)
);
}
@@ -116,7 +116,7 @@ public class ExternalInputSpecSlicerTest
splittableSlice("bar"),
splittableSlice("baz")
),
- slicer.sliceStatic(splittableSpec("foo", "bar", "baz"), 5)
+ slicer.sliceStatic(splittableSpec("foo", "bar", "baz"), null, 5)
);
}
@@ -125,7 +125,7 @@ public class ExternalInputSpecSlicerTest
{
Assert.assertEquals(
ImmutableList.of(),
- slicer.sliceStatic(splittableSpec(), 2)
+ slicer.sliceStatic(splittableSpec(), null, 2)
);
}
@@ -137,7 +137,7 @@ public class ExternalInputSpecSlicerTest
splittableSlice("foo", "baz"),
splittableSlice("bar")
),
- slicer.sliceStatic(splittableSpecThatIgnoresSplitHints("foo", "bar",
"baz"), 2)
+ slicer.sliceStatic(splittableSpecThatIgnoresSplitHints("foo", "bar",
"baz"), null, 2)
);
}
@@ -148,7 +148,7 @@ public class ExternalInputSpecSlicerTest
ImmutableList.of(
unsplittableSlice("foo", "bar", "baz")
),
- slicer.sliceDynamic(unsplittableSpec("foo", "bar", "baz"), 100, 1, 1)
+ slicer.sliceDynamic(unsplittableSpec("foo", "bar", "baz"), null, 100,
1, 1)
);
}
@@ -159,7 +159,7 @@ public class ExternalInputSpecSlicerTest
ImmutableList.of(
splittableSlice("foo", "bar", "baz")
),
- slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), 100, 5,
Long.MAX_VALUE)
+ slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), null, 100, 5,
Long.MAX_VALUE)
);
}
@@ -171,7 +171,7 @@ public class ExternalInputSpecSlicerTest
splittableSlice("foo", "bar"),
splittableSlice("baz")
),
- slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), 100, 2,
Long.MAX_VALUE)
+ slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), null, 100, 2,
Long.MAX_VALUE)
);
}
@@ -183,7 +183,7 @@ public class ExternalInputSpecSlicerTest
splittableSlice("foo", "bar"),
splittableSlice("baz")
),
- slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), 100, 5, 7)
+ slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), null, 100, 5,
7)
);
}
@@ -196,7 +196,7 @@ public class ExternalInputSpecSlicerTest
splittableSlice("bar.gz"),
splittableSlice("baz.gz")
),
- slicer.sliceDynamic(splittableSpec("foo.gz", "bar.gz", "baz.gz"), 100,
5, 7)
+ slicer.sliceDynamic(splittableSpec("foo.gz", "bar.gz", "baz.gz"),
null, 100, 5, 7)
);
}
@@ -209,7 +209,7 @@ public class ExternalInputSpecSlicerTest
splittableSlice("bar"),
splittableSlice("baz")
),
- slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar",
"baz"), 100, 5, 7)
+ slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar",
"baz"), null, 100, 5, 7)
);
}
@@ -221,7 +221,7 @@ public class ExternalInputSpecSlicerTest
splittableSlice("foo", "baz"),
splittableSlice("bar")
),
- slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar",
"baz"), 2, 2, Long.MAX_VALUE)
+ slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar",
"baz"), null, 2, 2, Long.MAX_VALUE)
);
}
@@ -232,7 +232,7 @@ public class ExternalInputSpecSlicerTest
ImmutableList.of(
splittableSlice("foo", "bar", "baz")
),
- slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar",
"baz"), 1, 5, Long.MAX_VALUE)
+ slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar",
"baz"), null, 1, 5, Long.MAX_VALUE)
);
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StageInputSpecSlicerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StageInputSpecSlicerTest.java
index 024ad956cf2..a3ec7d6e5b2 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StageInputSpecSlicerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StageInputSpecSlicerTest.java
@@ -78,7 +78,7 @@ public class StageInputSpecSlicerTest
OutputChannelMode.LOCAL_STORAGE
)
),
- slicer.sliceStatic(new StageInputSpec(0), 1)
+ slicer.sliceStatic(new StageInputSpec(0), null, 1)
);
}
@@ -98,7 +98,7 @@ public class StageInputSpecSlicerTest
OutputChannelMode.LOCAL_STORAGE
)
),
- slicer.sliceStatic(new StageInputSpec(0), 2)
+ slicer.sliceStatic(new StageInputSpec(0), null, 2)
);
}
@@ -118,7 +118,7 @@ public class StageInputSpecSlicerTest
OutputChannelMode.LOCAL_STORAGE
)
),
- slicer.sliceStatic(new StageInputSpec(1), 2)
+ slicer.sliceStatic(new StageInputSpec(1), null, 2)
);
}
@@ -127,7 +127,7 @@ public class StageInputSpecSlicerTest
{
final IllegalStateException e = Assert.assertThrows(
IllegalStateException.class,
- () -> slicer.sliceStatic(new StageInputSpec(3), 1)
+ () -> slicer.sliceStatic(new StageInputSpec(3), null, 1)
);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("Stage[3]
output partitions not available"));
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
index 28be6956a13..55ba95a9d98 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
@@ -30,7 +30,9 @@ import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.query.filter.FilterSegmentPruner;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
@@ -144,16 +146,16 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
@Test
public void test_canSliceDynamic()
{
- Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE,
null, null, null, null)));
+ Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE,
null, null)));
}
@Test
public void test_sliceStatic_noDataSource()
{
- final TableInputSpec spec = new TableInputSpec("no such datasource", null,
null, null, null);
+ final TableInputSpec spec = new TableInputSpec("no such datasource", null,
null);
Assert.assertEquals(
ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
- slicer.sliceStatic(spec, 2)
+ slicer.sliceStatic(spec, null, 2)
);
}
@@ -166,8 +168,6 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
Intervals.of("2000/P1M"),
Intervals.of("2000-06-01/P1M")
),
- null,
- null,
null
);
@@ -204,7 +204,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
ImmutableList.of()
)
),
- slicer.sliceStatic(spec, 1)
+ slicer.sliceStatic(spec, null, 1)
);
}
@@ -214,14 +214,12 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
Collections.singletonList(Intervals.of("2002/P1M")),
- null,
- null,
null
);
Assert.assertEquals(
ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
- slicer.sliceStatic(spec, 2)
+ slicer.sliceStatic(spec, null, 2)
);
}
@@ -235,9 +233,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
SEGMENT1.getInterval(),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum()
- )),
- null,
- null
+ ))
);
RichSegmentDescriptor expectedSegment = new RichSegmentDescriptor(
@@ -248,7 +244,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
);
Assert.assertEquals(
List.of(new SegmentsInputSlice(DATASOURCE, List.of(expectedSegment),
List.of())),
- slicer.sliceStatic(spec, 1));
+ slicer.sliceStatic(spec, null, 1));
}
@Test
@@ -261,14 +257,12 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
SEGMENT1.getInterval(),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum()
- )),
- null,
- null
+ ))
);
Assert.assertEquals(
ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
- slicer.sliceStatic(spec, 2)
+ slicer.sliceStatic(spec, null, 2)
);
}
@@ -278,8 +272,10 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
null,
- null,
- new SelectorDimFilter("dim", "bar", null),
+ null
+ );
+ final FilterSegmentPruner pruner = new FilterSegmentPruner(
+ new EqualityFilter("dim", ColumnType.STRING, "bar", null),
null
);
@@ -299,7 +295,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
),
NilInputSlice.INSTANCE
),
- slicer.sliceStatic(spec, 2)
+ slicer.sliceStatic(spec, pruner, 2)
);
}
@@ -309,8 +305,10 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
null,
- null,
- new SelectorDimFilter("dim", "bar", null),
+ null
+ );
+ final FilterSegmentPruner segmentPruner = new FilterSegmentPruner(
+ new EqualityFilter("dim", ColumnType.STRING, "bar", null),
Collections.emptySet()
);
@@ -335,7 +333,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
ImmutableList.of()
)
),
- slicer.sliceStatic(spec, 1)
+ slicer.sliceStatic(spec, segmentPruner, 1)
);
}
@@ -348,8 +346,10 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
Intervals.of("2000/P1M"),
Intervals.of("2000-06-01/P1M")
),
- null,
- new SelectorDimFilter("dim", "bar", null),
+ null
+ );
+ final FilterSegmentPruner segmentPruner = new FilterSegmentPruner(
+ new EqualityFilter("dim", ColumnType.STRING, "bar", null),
null
);
@@ -380,14 +380,14 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
ImmutableList.of()
)
),
- slicer.sliceStatic(spec, 2)
+ slicer.sliceStatic(spec, segmentPruner, 2)
);
}
@Test
public void test_sliceStatic_oneSlice()
{
- final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null,
null, null);
+ final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
Assert.assertEquals(
Collections.singletonList(
new SegmentsInputSlice(
@@ -409,14 +409,14 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
ImmutableList.of()
)
),
- slicer.sliceStatic(spec, 1)
+ slicer.sliceStatic(spec, null, 1)
);
}
@Test
public void test_sliceStatic_needTwoSlices()
{
- final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null,
null, null);
+ final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
Assert.assertEquals(
ImmutableList.of(
new SegmentsInputSlice(
@@ -444,14 +444,14 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
ImmutableList.of()
)
),
- slicer.sliceStatic(spec, 2)
+ slicer.sliceStatic(spec, null, 2)
);
}
@Test
public void test_sliceStatic_threeSlices()
{
- final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null,
null, null);
+ final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null);
Assert.assertEquals(
ImmutableList.of(
new SegmentsInputSlice(
@@ -480,7 +480,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
),
NilInputSlice.INSTANCE
),
- slicer.sliceStatic(spec, 3)
+ slicer.sliceStatic(spec, null, 3)
);
}
@@ -490,13 +490,12 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2002/P1M")),
- null,
null
);
Assert.assertEquals(
Collections.emptyList(),
- slicer.sliceDynamic(spec, 1, 1, 1)
+ slicer.sliceDynamic(spec, null, 1, 1, 1)
);
}
@@ -506,7 +505,6 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2000/P1M")),
- null,
null
);
@@ -531,7 +529,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
ImmutableList.of()
)
),
- slicer.sliceDynamic(spec, 1, 1, 1)
+ slicer.sliceDynamic(spec, null, 1, 1, 1)
);
}
@@ -541,7 +539,6 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2000/P1M")),
- null,
null
);
@@ -566,7 +563,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
ImmutableList.of()
)
),
- slicer.sliceDynamic(spec, 100, 5, BYTES_PER_SEGMENT * 5)
+ slicer.sliceDynamic(spec, null, 100, 5, BYTES_PER_SEGMENT * 5)
);
}
@@ -576,7 +573,6 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2000/P1M")),
- null,
null
);
@@ -607,7 +603,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
ImmutableList.of()
)
),
- slicer.sliceDynamic(spec, 100, 1, BYTES_PER_SEGMENT * 5)
+ slicer.sliceDynamic(spec, null, 100, 1, BYTES_PER_SEGMENT * 5)
);
}
@@ -617,7 +613,6 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
final TableInputSpec spec = new TableInputSpec(
DATASOURCE,
ImmutableList.of(Intervals.of("2000/P1M")),
- null,
null
);
@@ -648,7 +643,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
ImmutableList.of()
)
),
- slicer.sliceDynamic(spec, 100, 5, BYTES_PER_SEGMENT)
+ slicer.sliceDynamic(spec, null, 100, 5, BYTES_PER_SEGMENT)
);
}
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
index 452003311db..447f4f65479 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java
@@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
@@ -35,38 +34,17 @@ import java.util.Collections;
public class TableInputSpecTest extends InitializedNullHandlingTest
{
- @Test
- public void testSerde() throws Exception
- {
- final ObjectMapper mapper = TestHelper.makeJsonMapper()
- .registerModules(new
MSQIndexingModule().getJacksonModules());
- final TableInputSpec spec = new TableInputSpec(
- "myds",
- Collections.singletonList(Intervals.of("2000/P1M")),
- null,
- new SelectorDimFilter("dim", "val", null),
- Collections.singleton("dim")
- );
-
- Assert.assertEquals(
- spec,
- mapper.readValue(mapper.writeValueAsString(spec), InputSpec.class)
- );
- }
+ private final ObjectMapper mapper = TestHelper.makeJsonMapper()
+ .registerModules(new
MSQIndexingModule().getJacksonModules());
@Test
- public void testSerdeEmptyFilterFields() throws Exception
+ public void testSerde() throws Exception
{
- final ObjectMapper mapper = TestHelper.makeJsonMapper()
- .registerModules(new
MSQIndexingModule().getJacksonModules());
-
final TableInputSpec spec = new TableInputSpec(
"myds",
Collections.singletonList(Intervals.of("2000/P1M")),
- null,
- new SelectorDimFilter("dim", "val", null),
- Collections.emptySet()
+ null
);
Assert.assertEquals(
@@ -75,17 +53,13 @@ public class TableInputSpecTest extends
InitializedNullHandlingTest
);
}
+
@Test
public void testSerdeEternityInterval() throws Exception
{
- final ObjectMapper mapper = TestHelper.makeJsonMapper()
- .registerModules(new
MSQIndexingModule().getJacksonModules());
-
final TableInputSpec spec = new TableInputSpec(
"myds",
Intervals.ONLY_ETERNITY,
- null,
- new SelectorDimFilter("dim", "val", null),
null
);
@@ -98,15 +72,10 @@ public class TableInputSpecTest extends
InitializedNullHandlingTest
@Test
public void testSerdeWithSegments() throws Exception
{
- final ObjectMapper mapper = TestHelper.makeJsonMapper()
- .registerModules(new
MSQIndexingModule().getJacksonModules());
-
final TableInputSpec spec = new TableInputSpec(
"myds",
Collections.singletonList(Intervals.of("2000/P1M")),
- Collections.singletonList(new
SegmentDescriptor(Intervals.of("2000/P1M"), "version", 0)),
- new SelectorDimFilter("dim", "val", null),
- Collections.singleton("dim")
+ Collections.singletonList(new
SegmentDescriptor(Intervals.of("2000/P1M"), "version", 0))
);
Assert.assertEquals(
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerTestInputSpecSlicer.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerTestInputSpecSlicer.java
index 4d4cd9d9fc5..3a5fedb1074 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerTestInputSpecSlicer.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerTestInputSpecSlicer.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.kernel.controller;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.query.filter.SegmentPruner;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +37,7 @@ public class ControllerTestInputSpecSlicer implements
InputSpecSlicer
}
@Override
- public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+ public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable
SegmentPruner segmentPruner, int maxNumSlices)
{
final List<InputSlice> slices = new ArrayList<>(maxNumSlices);
for (int i = 0; i < maxNumSlices; i++) {
@@ -47,6 +49,7 @@ public class ControllerTestInputSpecSlicer implements
InputSpecSlicer
@Override
public List<InputSlice> sliceDynamic(
InputSpec inputSpec,
+ @Nullable SegmentPruner segmentPruner,
int maxNumSlices,
int maxFilesPerSlice,
long maxBytesPerSlice
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
index 43f3c5c5635..eae23222622 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
@@ -45,10 +45,12 @@ import
org.apache.druid.msq.input.stage.StripedReadablePartitions;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.querykit.common.OffsetLimitStageProcessor;
+import org.apache.druid.query.filter.SegmentPruner;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -466,7 +468,7 @@ public class WorkerInputsTest
);
Mockito.verify(testInputSpecSlicer,
times(0)).canSliceDynamic(inputSpecToSplit);
- Mockito.verify(testInputSpecSlicer, times(1)).sliceStatic(any(), anyInt());
+ Mockito.verify(testInputSpecSlicer, times(1)).sliceStatic(any(), any(),
anyInt());
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
@@ -511,7 +513,7 @@ public class WorkerInputsTest
);
Mockito.verify(testInputSpecSlicer,
times(1)).canSliceDynamic(inputSpecToSplit);
- Mockito.verify(testInputSpecSlicer, times(1)).sliceDynamic(any(),
anyInt(), anyInt(), anyLong());
+ Mockito.verify(testInputSpecSlicer, times(1)).sliceDynamic(any(), any(),
anyInt(), anyInt(), anyLong());
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
@@ -657,7 +659,7 @@ public class WorkerInputsTest
}
@Override
- public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
+ public List<InputSlice> sliceStatic(InputSpec inputSpec, @Nullable
SegmentPruner segmentPruner, int maxNumSlices)
{
final TestInputSpec testInputSpec = (TestInputSpec) inputSpec;
final List<List<Long>> assignments =
@@ -672,6 +674,7 @@ public class WorkerInputsTest
@Override
public List<InputSlice> sliceDynamic(
InputSpec inputSpec,
+ @Nullable SegmentPruner segmentPruner,
int maxNumSlices,
int maxFilesPerSlice,
long maxBytesPerSlice
diff --git
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
index 814f0809393..cea0ddfa84a 100644
---
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
+++
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq
@@ -23,14 +23,7 @@ order by 1;
"input" : [ {
"type" : "table",
"dataSource" : "wikipedia",
- "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
- "filter" : {
- "type" : "inType",
- "column" : "cityName",
- "matchValueType" : "STRING",
- "sortedValues" : [ "Aarhus", "New York" ]
- },
- "filterFields" : [ "cityName" ]
+ "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
} ],
"processor" : {
"type" : "groupByPreShuffle",
diff --git
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq
index 3dfead41f01..1de39db46fc 100644
---
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq
+++
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq
@@ -13,14 +13,7 @@ where w1.cityName='New York';
"input" : [ {
"type" : "table",
"dataSource" : "wikipedia",
- "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
- "filter" : {
- "type" : "equals",
- "column" : "cityName",
- "matchValueType" : "STRING",
- "matchValue" : "New York"
- },
- "filterFields" : [ "cityName" ]
+ "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
} ],
"processor" : {
"type" : "scan",
@@ -254,14 +247,7 @@ LogicalJoin:[[broadcast inheritPath:[0, 0]]]
"input" : [ {
"type" : "table",
"dataSource" : "wikipedia",
- "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
- "filter" : {
- "type" : "equals",
- "column" : "cityName",
- "matchValueType" : "STRING",
- "matchValue" : "New York"
- },
- "filterFields" : [ "cityName" ]
+ "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
} ],
"processor" : {
"type" : "scan",
@@ -496,14 +482,7 @@ LogicalJoin:[[sort_merge inheritPath:[0, 0]]]
"input" : [ {
"type" : "table",
"dataSource" : "wikipedia",
- "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
- "filter" : {
- "type" : "equals",
- "column" : "cityName",
- "matchValueType" : "STRING",
- "matchValue" : "New York"
- },
- "filterFields" : [ "cityName" ]
+ "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
} ],
"processor" : {
"type" : "scan",
diff --git
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqNestedJoinHint.iq
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqNestedJoinHint.iq
index 617ae774dc2..b71933ba6dc 100644
---
a/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqNestedJoinHint.iq
+++
b/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqNestedJoinHint.iq
@@ -143,14 +143,7 @@ where w1.cityName='New York';
"input" : [ {
"type" : "table",
"dataSource" : "wikipedia",
- "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
- "filter" : {
- "type" : "equals",
- "column" : "cityName",
- "matchValueType" : "STRING",
- "matchValue" : "New York"
- },
- "filterFields" : [ "cityName" ]
+ "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
}, {
"type" : "stage",
"stage" : 0
diff --git
a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
index 29babc115d0..ad364a754f0 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java
@@ -19,21 +19,8 @@
package org.apache.druid.query.filter;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.RangeSet;
-import org.apache.druid.timeline.partition.ShardSpec;
-
-import javax.annotation.Nullable;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
/**
*
@@ -90,86 +77,4 @@ public class DimFilterUtils
}
return retVal.array();
}
-
- /**
- * Filter the given iterable of objects by removing any object whose
ShardSpec, obtained from the converter function,
- * does not fit in the RangeSet of the dimFilter {@link
DimFilter#getDimensionRangeSet(String)}. The returned set
- * contains the filtered objects in the same order as they appear in input.
- *
- * DimensionRangedCache stores the RangeSets of different dimensions for the
dimFilter. It should be re-used
- * between calls with the same dimFilter to save redundant calls of {@link
DimFilter#getDimensionRangeSet(String)}
- * on same dimensions.
- *
- * @param dimFilter The filter to use
- * @param filterFields Set of fields to consider for pruning, or null
to consider all fields
- * @param input The iterable of objects to be filtered
- * @param converter The function to convert T to ShardSpec that
can be filtered by
- * @param dimensionRangeCache The cache of RangeSets of different dimensions
for the dimFilter
- * @param <T> This can be any type, as long as transform
function is provided to convert this to ShardSpec
- *
- * @return The set of filtered object, in the same order as input
- */
- public static <T> Set<T> filterShards(
- @Nullable final DimFilter dimFilter,
- @Nullable final Set<String> filterFields,
- final Iterable<T> input,
- final Function<T, ShardSpec> converter,
- final Map<String, Optional<RangeSet<String>>> dimensionRangeCache
- )
- {
- if (dimFilter == null) {
- // ImmutableSet retains order from "input".
- return ImmutableSet.copyOf(input);
- }
-
- // LinkedHashSet retains order from "input".
- Set<T> retSet = new LinkedHashSet<>();
-
- for (T obj : input) {
- ShardSpec shard = converter.apply(obj);
- boolean include = true;
-
- if (shard != null) {
- Map<String, RangeSet<String>> filterDomain = new HashMap<>();
- List<String> dimensions = shard.getDomainDimensions();
- for (String dimension : dimensions) {
- if (filterFields == null || filterFields.contains(dimension)) {
- Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
- .computeIfAbsent(dimension, d ->
Optional.ofNullable(dimFilter.getDimensionRangeSet(d)));
-
- if (optFilterRangeSet.isPresent()) {
- filterDomain.put(dimension, optFilterRangeSet.get());
- }
- }
- }
- if (!filterDomain.isEmpty() && !shard.possibleInDomain(filterDomain)) {
- include = false;
- }
- }
-
- if (include) {
- retSet.add(obj);
- }
- }
- return retSet;
- }
-
- /**
- * Returns a copy of "fields" filtered by the predicate function.
- */
- public static Set<String> onlyBaseFields(
- final Set<String> fields,
- final Predicate<String> isBaseColumnFn
- )
- {
- final Set<String> retVal = new HashSet<>();
-
- for (final String field : fields) {
- if (isBaseColumnFn.apply(field)) {
- retVal.add(field);
- }
- }
-
- return retVal;
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
new file mode 100644
index 00000000000..6dce9018b7a
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.google.common.collect.RangeSet;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.ShardSpec;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Uses a {@link DimFilter} to check the {@link
DimFilter#getDimensionRangeSet(String)} against
+ * {@link ShardSpec#possibleInDomain(Map)} in order to 'prune' a set of
segments whose rows would never match a filter
+ * and avoid processing those segments in the first place.
+ */
+public class FilterSegmentPruner implements SegmentPruner
+{
+ private final DimFilter filter;
+ private final Set<String> filterFields;
+ private final Map<String, Optional<RangeSet<String>>> rangeCache;
+
+ public FilterSegmentPruner(
+ DimFilter filter,
+ @Nullable Set<String> filterFields
+ )
+ {
+ InvalidInput.conditionalException(filter != null, "filter must not be
null");
+ this.filter = filter;
+ this.filterFields = filterFields == null ? filter.getRequiredColumns() :
filterFields;
+ this.rangeCache = new HashMap<>();
+ }
+
+ /**
+ * Filter the given iterable of objects by removing any object whose {@link
DataSegment}, obtained from the converter
+ * function, does not fit in the RangeSet of the dimFilter {@link
DimFilter#getDimensionRangeSet(String)}. The
+ * returned set contains the filtered objects in the same order as they
appear in input.
+ *
+ * {@link #rangeCache} stores the RangeSets of different dimensions for the
filter, so it can be re-used between
+ * calls to save redundant evaluation of {@link
DimFilter#getDimensionRangeSet(String)} on the same columns.
+ *
+ * @param input The iterable of objects to be filtered
+ * @param converter The function to convert T to {@link DataSegment} that
can be filtered by
+ * @param <T> This can be any type, as long as transform function is
provided to extract a {@link DataSegment}
+ *
+ * @return The set of pruned object, in the same order as input
+ */
+ @Override
+ public <T> Collection<T> prune(Iterable<T> input, Function<T, DataSegment>
converter)
+ {
+ // LinkedHashSet retains order from "input".
+ final Set<T> retSet = new LinkedHashSet<>();
+
+ for (T obj : input) {
+ final DataSegment segment = converter.apply(obj);
+ if (segment == null) {
+ continue;
+ }
+ final ShardSpec shard = segment.getShardSpec();
+ boolean include = true;
+
+ if (shard != null) {
+ Map<String, RangeSet<String>> filterDomain = new HashMap<>();
+ List<String> dimensions = shard.getDomainDimensions();
+ for (String dimension : dimensions) {
+ if (filterFields == null || filterFields.contains(dimension)) {
+ Optional<RangeSet<String>> optFilterRangeSet =
+ rangeCache.computeIfAbsent(dimension, d ->
Optional.ofNullable(filter.getDimensionRangeSet(d)));
+
+ optFilterRangeSet.ifPresent(stringRangeSet ->
filterDomain.put(dimension, stringRangeSet));
+ }
+ }
+ if (!filterDomain.isEmpty() && !shard.possibleInDomain(filterDomain)) {
+ include = false;
+ }
+ }
+
+ if (include) {
+ retSet.add(obj);
+ }
+ }
+ return retSet;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FilterSegmentPruner that = (FilterSegmentPruner) o;
+ return Objects.equals(filter, that.filter) && Objects.equals(filterFields,
that.filterFields);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(filter, filterFields);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "FilterSegmentPruner{" +
+ "filter=" + filter +
+ ", filterFields=" + filterFields +
+ ", rangeCache=" + rangeCache +
+ '}';
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
b/processing/src/main/java/org/apache/druid/query/filter/SegmentPruner.java
similarity index 51%
copy from
multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
copy to
processing/src/main/java/org/apache/druid/query/filter/SegmentPruner.java
index 90a6970485e..4b372271659 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/SegmentPruner.java
@@ -17,24 +17,18 @@
* under the License.
*/
-package org.apache.druid.msq.input;
+package org.apache.druid.query.filter;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.timeline.DataSegment;
-/**
- * Slice of an {@link InputSpec} assigned to a particular worker.
- *
- * On the controller, these are produced using {@link InputSpecSlicer}. On
workers, these are transformed into
- * {@link ReadableSlice} using {@link InputSliceReader}.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-public interface InputSlice
+import java.util.Collection;
+import java.util.function.Function;
+
+public interface SegmentPruner
{
/**
- * Returns the number of files contained within this split. This is the same
number that would be added to
- * {@link org.apache.druid.msq.counters.CounterTracker} on full iteration
through {@link InputSliceReader#attach}.
- *
- * May be zero for some kinds of slices, even if they contain data, if the
input is not file-based.
+ * Filter the given {@link Iterable} of objects containing a {@link
DataSegment} (obtained from the converter
+ * function), to reduce the overall working set which need to be processed.
*/
- int fileCount();
+ <T> Collection<T> prune(Iterable<T> input, Function<T, DataSegment>
converter);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
index b12dfd88e4d..d5baa72dd08 100644
---
a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
+++
b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
@@ -31,6 +31,8 @@ import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.filter.FilterSegmentPruner;
+import org.apache.druid.query.filter.SegmentPruner;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@@ -39,8 +41,11 @@ import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.join.JoinPrefixUtils;
+import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* Represents the native engine's execution vertex - the execution unit it may
execute in one execution cycle.
@@ -82,6 +87,31 @@ import java.util.List;
*/
public class ExecutionVertex
{
+ /**
+ * Identifies the vertex for the given query.
+ */
+ public static ExecutionVertex of(Query<?> query)
+ {
+ ExecutionVertexExplorer explorer = new ExecutionVertexExplorer(query);
+ return new ExecutionVertex(explorer);
+ }
+
+ /**
+ * Builds the {@link ExecutionVertex} around a {@link DataSource}.
+ *
+ * Kept for backward compatibility reasons - incorporating
+ * {@link ExecutionVertex} into Filtration will make this obsolete.
+ */
+ public static ExecutionVertex ofDataSource(DataSource dataSource)
+ {
+ ScanQuery query = Druids
+ .newScanQueryBuilder()
+ .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
+ .dataSource(dataSource)
+ .build();
+ return ExecutionVertex.of(query);
+ }
+
/** The top level query this vertex is describing. */
protected final Query<?> topQuery;
/** The base datasource which will be read during the execution. */
@@ -90,6 +120,7 @@ public class ExecutionVertex
protected final QuerySegmentSpec querySegmentSpec;
/** Retained for compatibility with earlier implementation. See {@link
#isBaseColumn(String)} */
protected final List<String> joinPrefixes;
+
/** Retained for compatibility with earlier implementation. */
protected boolean allRightsAreGlobal;
@@ -102,15 +133,6 @@ public class ExecutionVertex
this.allRightsAreGlobal = explorer.allRightsAreGlobal;
}
- /**
- * Identifies the vertex for the given query.
- */
- public static ExecutionVertex of(Query<?> query)
- {
- ExecutionVertexExplorer explorer = new ExecutionVertexExplorer(query);
- return new ExecutionVertex(explorer);
- }
-
/**
* The base datasource input of this vertex.
*/
@@ -139,6 +161,141 @@ public class ExecutionVertex
|| (baseDataSource instanceof UnionDataSource && ((UnionDataSource)
baseDataSource).isTableBased());
}
+ /**
+ * Unwraps the {@link #getBaseDataSource()} if its a {@link TableDataSource}.
+ *
+ * @throws DruidException error of type {@link
DruidException.Category#DEFENSIVE} if the {@link #getBaseDataSource()}
+ * is not a table. Note that this may not be true even {@link
#isProcessable()} ()} is true - in cases when the base
+ * datasource is a {@link UnionDataSource} of {@link TableDataSource}.
+ */
+ public final TableDataSource getBaseTableDataSource()
+ {
+ if (baseDataSource instanceof TableDataSource) {
+ return (TableDataSource) baseDataSource;
+ } else {
+ throw DruidException.defensive("Base dataSource[%s] is not a table!",
baseDataSource);
+ }
+ }
+
+ /**
+ * The applicable {@link QuerySegmentSpec} for this vertex.
+ *
+ * There might be more queries inside a single vertex; so the outer one is
not
+ * necessary correct.
+ */
+ public QuerySegmentSpec getEffectiveQuerySegmentSpec()
+ {
+ Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec is null!");
+ return querySegmentSpec;
+ }
+
+ /**
+ * Decides if the query can be executed using the cluster walker.
+ */
+ public boolean canRunQueryUsingClusterWalker()
+ {
+ return isProcessable() && isTableBased();
+ }
+
+ /**
+ * Decides if the query can be executed using the local walker.
+ */
+ public boolean canRunQueryUsingLocalWalker()
+ {
+ return isProcessable() && !isTableBased();
+ }
+
+ /**
+ * Decides if the execution time segment mapping function will be expensive.
+ */
+ public boolean isSegmentMapFunctionExpensive()
+ {
+ boolean hasJoin = !joinPrefixes.isEmpty();
+ return hasJoin;
+ }
+
+ @Nullable
+ public SegmentPruner getSegmentPruner()
+ {
+ if (!topQuery.context().isSecondaryPartitionPruningEnabled()) {
+ return null;
+ }
+ if (topQuery.getFilter() == null) {
+ return null;
+ }
+ final Set<String> baseFields = new HashSet<>();
+ for (final String field : topQuery.getFilter().getRequiredColumns()) {
+ if (isBaseColumn(field)) {
+ baseFields.add(field);
+ }
+ }
+
+ return new FilterSegmentPruner(
+ topQuery.getFilter(),
+ baseFields
+ );
+ }
+
+ /**
+ * Answers if the given column is coming from the base datasource or not.
+ *
+ * Retained for backward compatibility for now. The approach taken here
relies
+ * on join prefixes - which might classify the output of a
+ * {@link VirtualColumn} to be coming from the base datasource. <br/>
+ * An alternate approach would be to analyze these during the segmentmap
+ * function creation.
+ */
+ public boolean isBaseColumn(String columnName)
+ {
+ for (String prefix : joinPrefixes) {
+ if (JoinPrefixUtils.isPrefixedBy(columnName, prefix)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Query buildQueryWithBaseDataSource(DataSource newBaseDataSource)
+ {
+ return new ReplaceBaseDataSource(baseDataSource,
newBaseDataSource).traverse(topQuery);
+ }
+
+ /**
+ * Assembles the segment mapping function which should be applied to the
input segments.
+ */
+ public SegmentMapFunction createSegmentMapFunction(PolicyEnforcer
policyEnforcer)
+ {
+ return
getTopDataSource().createSegmentMapFunction(topQuery).thenMap(segment -> {
+ segment.validateOrElseThrow(policyEnforcer);
+ return segment;
+ });
+ }
+
+ /**
+ * Returns the first datasource which is not collapsible by the topQuery.
+ */
+ private DataSource getTopDataSource()
+ {
+ Query<?> q = topQuery;
+ while (q.mayCollapseQueryDataSource()) {
+ q = ((QueryDataSource) q.getDataSource()).getQuery();
+ }
+ return q.getDataSource();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Explores the vertex and collects all details.
*/
@@ -233,100 +390,6 @@ public class ExecutionVertex
}
}
- /**
- * Builds the {@link ExecutionVertex} around a {@link DataSource}.
- *
- * Kept for backward compatibility reasons - incorporating
- * {@link ExecutionVertex} into Filtration will make this obsolete.
- */
- public static ExecutionVertex ofDataSource(DataSource dataSource)
- {
- ScanQuery query = Druids
- .newScanQueryBuilder()
- .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
- .dataSource(dataSource)
- .build();
- return ExecutionVertex.of(query);
- }
-
- /**
- * Unwraps the {@link #getBaseDataSource()} if its a {@link TableDataSource}.
- *
- * @throws DruidException error of type {@link
DruidException.Category#DEFENSIVE} if the {@link #getBaseDataSource()}
- * is not a table. Note that this may not be true even {@link
#isProcessable()} ()} is true - in cases when the base
- * datasource is a {@link UnionDataSource} of {@link TableDataSource}.
- */
- public final TableDataSource getBaseTableDataSource()
- {
- if (baseDataSource instanceof TableDataSource) {
- return (TableDataSource) baseDataSource;
- } else {
- throw DruidException.defensive("Base dataSource[%s] is not a table!",
baseDataSource);
- }
- }
-
- /**
- * The applicable {@link QuerySegmentSpec} for this vertex.
- *
- * There might be more queries inside a single vertex; so the outer one is
not
- * necessary correct.
- */
- public QuerySegmentSpec getEffectiveQuerySegmentSpec()
- {
- Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec is null!");
- return querySegmentSpec;
- }
-
- /**
- * Decides if the query can be executed using the cluster walker.
- */
- public boolean canRunQueryUsingClusterWalker()
- {
- return isProcessable() && isTableBased();
- }
-
- /**
- * Decides if the query can be executed using the local walker.
- */
- public boolean canRunQueryUsingLocalWalker()
- {
- return isProcessable() && !isTableBased();
- }
-
- /**
- * Decides if the execution time segment mapping function will be expensive.
- */
- public boolean isSegmentMapFunctionExpensive()
- {
- boolean hasJoin = !joinPrefixes.isEmpty();
- return hasJoin;
- }
-
- /**
- * Answers if the given column is coming from the base datasource or not.
- *
- * Retained for backward compatibility for now. The approach taken here
relies
- * on join prefixes - which might classify the output of a
- * {@link VirtualColumn} to be coming from the base datasource. <br/>
- * An alternate approach would be to analyze these during the segmentmap
- * function creation.
- */
- public boolean isBaseColumn(String columnName)
- {
- for (String prefix : joinPrefixes) {
- if (JoinPrefixUtils.isPrefixedBy(columnName, prefix)) {
- return false;
- }
- }
- return true;
- }
-
- @SuppressWarnings("rawtypes")
- public Query buildQueryWithBaseDataSource(DataSource newBaseDataSource)
- {
- return new ReplaceBaseDataSource(baseDataSource,
newBaseDataSource).traverse(topQuery);
- }
-
/**
* Replaces the base datasource of the given query.
*/
@@ -368,39 +431,4 @@ public class ExecutionVertex
return query;
}
}
-
- @Override
- public boolean equals(Object obj)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int hashCode()
- {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Assembles the segment mapping function which should be applied to the
input segments.
- */
- public SegmentMapFunction createSegmentMapFunction(PolicyEnforcer
policyEnforcer)
- {
- return
getTopDataSource().createSegmentMapFunction(topQuery).thenMap(segment -> {
- segment.validateOrElseThrow(policyEnforcer);
- return segment;
- });
- }
-
- /**
- * Returns the first datasource which is not collapsible by the topQuery.
- */
- private DataSource getTopDataSource()
- {
- Query<?> q = topQuery;
- while (q.mayCollapseQueryDataSource()) {
- q = ((QueryDataSource) q.getDataSource()).getQuery();
- }
- return q.getDataSource();
- }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
deleted file mode 100644
index 179abe02121..00000000000
---
a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.filter;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableRangeSet;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Range;
-import com.google.common.collect.RangeSet;
-import org.apache.druid.timeline.partition.ShardSpec;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-public class DimFilterUtilsTest
-{
- private static final Function<ShardSpec, ShardSpec> CONVERTER = new
Function<>()
- {
- @Nullable
- @Override
- public ShardSpec apply(@Nullable ShardSpec input)
- {
- return input;
- }
- };
-
- @Test
- public void testFilterShards()
- {
- DimFilter filter1 = EasyMock.createMock(DimFilter.class);
- EasyMock.expect(filter1.getDimensionRangeSet("dim1"))
- .andReturn(rangeSet(ImmutableList.of(Range.lessThan("abc"))))
- .anyTimes();
- EasyMock.expect(filter1.getDimensionRangeSet("dim2"))
- .andReturn(null)
- .anyTimes();
-
- ShardSpec shard1 = shardSpec("dim1", true);
- ShardSpec shard2 = shardSpec("dim1", false);
- ShardSpec shard3 = shardSpec("dim1", false);
- ShardSpec shard4 = shardSpec("dim2", false);
- ShardSpec shard5 = shardSpec("dim2", false);
- ShardSpec shard6 = shardSpec("dim2", false);
- ShardSpec shard7 = shardSpec("dim2", false);
-
- List<ShardSpec> shards = ImmutableList.of(shard1, shard2, shard3, shard4,
shard5, shard6, shard7);
- EasyMock.replay(filter1, shard1, shard2, shard3, shard4, shard5, shard6,
shard7);
-
- Set<ShardSpec> expected1 = ImmutableSet.of(shard1, shard4, shard5, shard6,
shard7);
- assertFilterResult(filter1, null, shards, expected1);
- assertFilterResult(filter1, Collections.singleton("dim1"), shards,
expected1);
- assertFilterResult(filter1, Collections.singleton("dim2"), shards,
ImmutableSet.copyOf(shards));
- assertFilterResult(filter1, Collections.emptySet(), shards,
ImmutableSet.copyOf(shards));
- }
-
- private void assertFilterResult(
- DimFilter filter,
- Set<String> filterFields,
- Iterable<ShardSpec> input,
- Set<ShardSpec> expected
- )
- {
- Set<ShardSpec> result = new HashSet<>();
- Map<String, Optional<RangeSet<String>>> dimensionRangeMap = new
HashMap<>();
- for (ShardSpec shard : input) {
- result.addAll(
- DimFilterUtils.filterShards(
- filter,
- filterFields,
- ImmutableList.of(shard),
- CONVERTER,
- dimensionRangeMap
- )
- );
- }
- Assert.assertEquals(expected, result);
- }
-
- private static RangeSet<String> rangeSet(List<Range<String>> ranges)
- {
- ImmutableRangeSet.Builder<String> builder = ImmutableRangeSet.builder();
- for (Range<String> range : ranges) {
- builder.add(range);
- }
- return builder.build();
- }
-
- private static ShardSpec shardSpec(String dimension, boolean contained)
- {
- ShardSpec shard = EasyMock.createMock(ShardSpec.class);
- EasyMock.expect(shard.getDomainDimensions())
- .andReturn(ImmutableList.of(dimension))
- .anyTimes();
- EasyMock.expect(shard.possibleInDomain(EasyMock.anyObject()))
- .andReturn(contained)
- .anyTimes();
- return shard;
- }
-}
diff --git
a/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
new file mode 100644
index 00000000000..131337ad468
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+class FilterSegmentPrunerTest
+{
+ @Test
+ void testNullFilter()
+ {
+ Throwable t = Assertions.assertThrows(
+ DruidException.class,
+ () -> new FilterSegmentPruner(null, null)
+ );
+ Assertions.assertEquals("filter must not be null", t.getMessage());
+ }
+
+ @Test
+ void testPrune()
+ {
+ DimFilter range_a = new RangeFilter("dim1", ColumnType.STRING, null,
"aaa", null, null, null);
+ DimFilter expression_b = new ExpressionDimFilter("dim2 == 'c'", null,
TestExprMacroTable.INSTANCE);
+
+ String interval1 = "2026-02-18T00:00:00Z/2026-02-19T00:00:00Z";
+ String interval2 = "2026-02-19T00:00:00Z/2026-02-20T00:00:00Z";
+
+ DataSegment seg1 = makeDataSegment(interval1, makeRange("dim1", 0, null,
"abc"));
+ DataSegment seg2 = makeDataSegment(interval1, makeRange("dim1", 1, "abc",
"lmn"));
+ DataSegment seg3 = makeDataSegment(interval1, makeRange("dim1", 2, "lmn",
null));
+ DataSegment seg4 = makeDataSegment(interval2, makeRange("dim2", 0, null,
"b"));
+ DataSegment seg5 = makeDataSegment(interval2, makeRange("dim2", 1, "b",
"j"));
+ DataSegment seg6 = makeDataSegment(interval2, makeRange("dim2", 2, "j",
"s"));
+ DataSegment seg7 = makeDataSegment(interval2, makeRange("dim2", 3, "s",
"t"));
+
+ List<DataSegment> segs = List.of(seg1, seg2, seg3, seg4, seg5, seg6, seg7);
+
+ FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null);
+ FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a,
Collections.emptySet());
+ FilterSegmentPruner prunerExpression = new
FilterSegmentPruner(expression_b, null);
+
+ Assertions.assertEquals(Set.of(seg1, seg4, seg5, seg6, seg7),
prunerRange.prune(segs, Function.identity()));
+ Assertions.assertEquals(Set.copyOf(segs), prunerExpression.prune(segs,
Function.identity()));
+ Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs,
Function.identity()));
+ }
+
+ @Test
+ void testEqualsAndHashcode()
+ {
+
EqualsVerifier.forClass(FilterSegmentPruner.class).usingGetClass().withIgnoredFields("rangeCache").verify();
+ }
+
+ private ShardSpec makeRange(
+ String column,
+ int partitionNumber,
+ @Nullable String start,
+ @Nullable String end
+ )
+ {
+ return makeRange(
+ List.of(column),
+ partitionNumber,
+ start == null ? null : StringTuple.create(start),
+ end == null ? null : StringTuple.create(end)
+ );
+ }
+
+ private ShardSpec makeRange(
+ List<String> columns,
+ int partitionNumber,
+ @Nullable StringTuple start,
+ @Nullable StringTuple end
+ )
+ {
+ return new DimensionRangeShardSpec(
+ columns,
+ start,
+ end,
+ partitionNumber,
+ 0
+ );
+ }
+
+ private DataSegment makeDataSegment(String intervalString, ShardSpec
shardSpec)
+ {
+ Interval interval = Intervals.of(intervalString);
+ return DataSegment.builder(SegmentId.of("prune-test", interval, "0",
shardSpec))
+ .shardSpec(shardSpec)
+ .build();
+ }
+}
diff --git
a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/qaWin/sql_explain.msq.iq
b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/qaWin/sql_explain.msq.iq
index 2f1a50c377b..9891f57e668 100644
---
a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/qaWin/sql_explain.msq.iq
+++
b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/qaWin/sql_explain.msq.iq
@@ -66,14 +66,7 @@ WHERE client_ip IN ('107.13.54.103',
"input" : [ {
"type" : "table",
"dataSource" : "test_win",
- "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
- "filter" : {
- "type" : "inType",
- "column" : "client_ip",
- "matchValueType" : "STRING",
- "sortedValues" : [ "107.13.54.103", "99.9.55.22" ]
- },
- "filterFields" : [ "client_ip" ]
+ "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
} ],
"processor" : {
"type" : "groupByPreShuffle",
@@ -726,14 +719,7 @@ WHERE client_ip IN ('107.13.54.103',
"input" : [ {
"type" : "table",
"dataSource" : "test_win",
- "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
- "filter" : {
- "type" : "inType",
- "column" : "client_ip",
- "matchValueType" : "STRING",
- "sortedValues" : [ "107.13.54.103", "99.9.55.22" ]
- },
- "filterFields" : [ "client_ip" ]
+ "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
} ],
"processor" : {
"type" : "groupByPreShuffle",
@@ -1380,14 +1366,7 @@ GROUP BY server_ip,
"input" : [ {
"type" : "table",
"dataSource" : "test_win",
- "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
- "filter" : {
- "type" : "inType",
- "column" : "client_ip",
- "matchValueType" : "STRING",
- "sortedValues" : [ "107.13.54.103", "99.9.55.22" ]
- },
- "filterFields" : [ "client_ip" ]
+ "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
} ],
"processor" : {
"type" : "groupByPreShuffle",
@@ -2017,14 +1996,7 @@ GROUP BY server_ip,
"input" : [ {
"type" : "table",
"dataSource" : "test_win",
- "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
- "filter" : {
- "type" : "inType",
- "column" : "client_ip",
- "matchValueType" : "STRING",
- "sortedValues" : [ "107.13.54.103", "99.9.55.22" ]
- },
- "filterFields" : [ "client_ip" ]
+ "intervals" : [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
} ],
"processor" : {
"type" : "groupByPreShuffle",
diff --git
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 33237f372a2..eb0a5a83997 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
-import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
@@ -72,7 +71,7 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.ResponseContext;
-import org.apache.druid.query.filter.DimFilterUtils;
+import org.apache.druid.query.filter.SegmentPruner;
import org.apache.druid.query.planning.ExecutionVertex;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.QueryScheduler;
@@ -91,6 +90,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -442,32 +442,16 @@ public class CachingClusteredClient implements
QuerySegmentWalker
);
final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
- final Map<String, Optional<RangeSet<String>>> dimensionRangeCache;
- final Set<String> filterFieldsForPruning;
-
- final boolean trySecondaryPartititionPruning =
- query.getFilter() != null &&
query.context().isSecondaryPartitionPruningEnabled();
-
- if (trySecondaryPartititionPruning) {
- dimensionRangeCache = new HashMap<>();
- filterFieldsForPruning =
-
DimFilterUtils.onlyBaseFields(query.getFilter().getRequiredColumns(),
ev::isBaseColumn);
- } else {
- dimensionRangeCache = null;
- filterFieldsForPruning = null;
- }
+ final SegmentPruner segmentPruner = ev.getSegmentPruner();
boolean isRealtimeSegmentOnly = query.context().isRealtimeSegmentsOnly();
// Filter unneeded chunks based on partition dimension
for (TimelineObjectHolder<String, ServerSelector> holder :
serversLookup) {
- final Set<PartitionChunk<ServerSelector>> filteredChunks;
- if (trySecondaryPartititionPruning) {
- filteredChunks = DimFilterUtils.filterShards(
- query.getFilter(),
- filterFieldsForPruning,
+ final Collection<PartitionChunk<ServerSelector>> filteredChunks;
+ if (segmentPruner != null) {
+ filteredChunks = segmentPruner.prune(
holder.getObject(),
- partitionChunk ->
partitionChunk.getObject().getSegment().getShardSpec(),
- dimensionRangeCache
+ partitionChunk -> partitionChunk.getObject().getSegment()
);
} else {
filteredChunks = Sets.newLinkedHashSet(holder.getObject());
diff --git a/web-console/src/druid-models/stages/stages.mock.ts
b/web-console/src/druid-models/stages/stages.mock.ts
index 8b056c1f3ae..c10c5794571 100644
--- a/web-console/src/druid-models/stages/stages.mock.ts
+++ b/web-console/src/druid-models/stages/stages.mock.ts
@@ -215,13 +215,6 @@ export const STAGES = new Stages(
type: 'table',
dataSource: 'kttm_simple',
intervals:
['-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z'],
- filter: {
- type: 'equals',
- column: 'os',
- matchValueType: 'STRING',
- matchValue: 'iOS',
- },
- filterFields: ['os'],
},
{
type: 'stage',
diff --git a/web-console/src/druid-models/stages/stages.ts
b/web-console/src/druid-models/stages/stages.ts
index cbb67a26b5b..6831891a8b0 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -71,8 +71,6 @@ export type StageInput =
type: 'table';
dataSource: string;
intervals: string[];
- filter?: any;
- filterFields?: string[];
}
| {
type: 'external';
diff --git
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
index 26a1f005ff6..ff43e3f8f99 100644
---
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
+++
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
@@ -19,7 +19,6 @@
import { Button, Icon, Intent } from '@blueprintjs/core';
import { IconNames } from '@blueprintjs/icons';
import classNames from 'classnames';
-import * as JSONBig from 'json-bigint-native';
import React from 'react';
import type { Column } from 'react-table';
import ReactTable from 'react-table';
@@ -78,7 +77,6 @@ function summarizeTableInput(tableStageInput: StageInput):
string {
return assemble(
`Datasource: ${tableStageInput.dataSource}`,
`Interval: ${tableStageInput.intervals.join('; ')}`,
- tableStageInput.filter && `Filter:
${JSONBig.stringify(tableStageInput.filter)}`,
).join('\n');
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]