This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2dd073c2cd Pass metrics object for Scan, Timeseries and GroupBy
queries during cursor creation (#12484)
2dd073c2cd is described below
commit 2dd073c2cdd1a969a703612b433e4d7820bedc8b
Author: Rohan Garg <[email protected]>
AuthorDate: Mon May 9 23:10:17 2022 +0530
Pass metrics object for Scan, Timeseries and GroupBy queries during cursor
creation (#12484)
* Pass metrics object for Scan, Timeseries and GroupBy queries during
cursor creation
* fixup! Pass metrics object for Scan, Timeseries and GroupBy queries
during cursor creation
* Document vectorized dimension
---
docs/operations/metrics.md | 2 +-
.../DistinctCountTimeseriesQueryTest.java | 3 ++-
.../org/apache/druid/query/QueryRunnerHelper.java | 6 +++--
.../druid/query/groupby/GroupByQueryEngine.java | 8 +++++--
.../query/groupby/GroupByQueryRunnerFactory.java | 4 +++-
.../epinephelinae/GroupByQueryEngineV2.java | 17 +++++++++-----
.../epinephelinae/vector/VectorGroupByEngine.java | 6 +++--
.../query/groupby/strategy/GroupByStrategy.java | 8 +++++--
.../query/groupby/strategy/GroupByStrategyV1.java | 13 ++++++++---
.../query/groupby/strategy/GroupByStrategyV2.java | 11 +++++++--
.../apache/druid/query/scan/ScanQueryEngine.java | 7 ++++--
.../druid/query/scan/ScanQueryRunnerFactory.java | 2 +-
.../TimeBoundaryQueryRunnerFactory.java | 3 ++-
.../query/timeseries/TimeseriesQueryEngine.java | 21 +++++++++++------
.../timeseries/TimeseriesQueryRunnerFactory.java | 2 +-
.../IncrementalIndexStorageAdapter.java | 4 ++++
.../first/StringFirstTimeseriesQueryTest.java | 6 +++--
.../last/StringLastTimeseriesQueryTest.java | 6 +++--
.../query/groupby/GroupByQueryRunnerTest.java | 17 ++++++++++++--
.../groupby/GroupByQueryRunnerTestHelper.java | 26 ++++++++++++++++++++++
.../druid/query/scan/ScanQueryRunnerTest.java | 15 ++++++++++++-
.../timeseries/TimeseriesQueryRunnerTest.java | 18 ++++++++++++++-
.../IncrementalIndexStorageAdapterTest.java | 9 +++++---
23 files changed, 169 insertions(+), 45 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index dd442bfbf8..eb86c91702 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -64,7 +64,7 @@ Metrics may have additional dimensions beyond those listed
above.
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource,
type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation
Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN:
threshold, dimension.|< 1s|
-|`query/segment/time`|Milliseconds taken to query individual segment. Includes
time to page in the segment from disk.|id, status, segment.|several hundred
milliseconds|
+|`query/segment/time`|Milliseconds taken to query individual segment. Includes
time to page in the segment from disk.|id, status, segment, vectorized.|several
hundred milliseconds|
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id,
segment.|< several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be
scanned.||Close to 0|
|`query/segmentAndCache/time`|Milliseconds taken to query individual segment
or hit the cache (if it is enabled on the Historical process).|id,
segment.|several hundred milliseconds|
diff --git
a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
index c218004375..c5faefc65c 100644
---
a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
+++
b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@@ -100,7 +101,7 @@ public class DistinctCountTimeseriesQueryTest extends
InitializedNullHandlingTes
.build();
final Iterable<Result<TimeseriesResultValue>> results =
- engine.process(query, new
IncrementalIndexStorageAdapter(index)).toList();
+ engine.process(query, new IncrementalIndexStorageAdapter(index), new
DefaultTimeseriesQueryMetrics()).toList();
List<Result<TimeseriesResultValue>> expectedResults =
Collections.singletonList(
new Result<>(
diff --git
a/processing/src/main/java/org/apache/druid/query/QueryRunnerHelper.java
b/processing/src/main/java/org/apache/druid/query/QueryRunnerHelper.java
index 8788778157..88b12e81c8 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryRunnerHelper.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryRunnerHelper.java
@@ -31,6 +31,7 @@ import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
@@ -46,7 +47,8 @@ public class QueryRunnerHelper
final VirtualColumns virtualColumns,
final boolean descending,
final Granularity granularity,
- final Function<Cursor, Result<T>> mapFn
+ final Function<Cursor, Result<T>> mapFn,
+ @Nullable final QueryMetrics<?> queryMetrics
)
{
Preconditions.checkArgument(
@@ -55,7 +57,7 @@ public class QueryRunnerHelper
return Sequences.filter(
Sequences.map(
- adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns,
granularity, descending, null),
+ adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns,
granularity, descending, queryMetrics),
mapFn
),
Objects::nonNull
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
index 62a82dc844..611d008406 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java
@@ -85,7 +85,11 @@ public class GroupByQueryEngine
this.intermediateResultsBufferPool = intermediateResultsBufferPool;
}
- public Sequence<Row> process(final GroupByQuery query, final StorageAdapter
storageAdapter)
+ public Sequence<Row> process(
+ final GroupByQuery query,
+ final StorageAdapter storageAdapter,
+ @Nullable final GroupByQueryMetrics groupByQueryMetrics
+ )
{
if (storageAdapter == null) {
throw new ISE(
@@ -112,7 +116,7 @@ public class GroupByQueryEngine
query.getVirtualColumns(),
query.getGranularity(),
false,
- null
+ groupByQueryMetrics
);
final ResourceHolder<ByteBuffer> bufferHolder =
intermediateResultsBufferPool.take();
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java
index 51a7d1a5aa..6553f4c4ee 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java
@@ -102,7 +102,9 @@ public class GroupByQueryRunnerFactory implements
QueryRunnerFactory<ResultRow,
throw new ISE("Got a [%s] which isn't a %s", query.getClass(),
GroupByQuery.class);
}
- return strategySelector.strategize((GroupByQuery)
query).process((GroupByQuery) query, adapter);
+ return strategySelector
+ .strategize((GroupByQuery) query)
+ .process((GroupByQuery) query, adapter, (GroupByQueryMetrics)
queryPlus.getQueryMetrics());
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index c6fc53b82b..a61a592bb2 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -41,6 +41,7 @@ import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import
org.apache.druid.query.groupby.epinephelinae.column.ArrayDoubleGroupByColumnSelectorStrategy;
import
org.apache.druid.query.groupby.epinephelinae.column.ArrayLongGroupByColumnSelectorStrategy;
@@ -90,7 +91,7 @@ import java.util.stream.Stream;
* This code runs on data servers, like Historicals.
*
* Used by
- * {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter)}.
+ * {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter,
GroupByQueryMetrics)}.
*/
public class GroupByQueryEngineV2
{
@@ -119,7 +120,8 @@ public class GroupByQueryEngineV2
final GroupByQuery query,
@Nullable final StorageAdapter storageAdapter,
final NonBlockingPool<ByteBuffer> intermediateResultsBufferPool,
- final GroupByQueryConfig querySpecificConfig
+ final GroupByQueryConfig querySpecificConfig,
+ @Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (storageAdapter == null) {
@@ -161,7 +163,8 @@ public class GroupByQueryEngineV2
fudgeTimestamp,
filter,
interval,
- querySpecificConfig
+ querySpecificConfig,
+ groupByQueryMetrics
);
} else {
result = processNonVectorized(
@@ -171,7 +174,8 @@ public class GroupByQueryEngineV2
fudgeTimestamp,
querySpecificConfig,
filter,
- interval
+ interval,
+ groupByQueryMetrics
);
}
@@ -190,7 +194,8 @@ public class GroupByQueryEngineV2
@Nullable final DateTime fudgeTimestamp,
final GroupByQueryConfig querySpecificConfig,
@Nullable final Filter filter,
- final Interval interval
+ final Interval interval,
+ @Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
@@ -199,7 +204,7 @@ public class GroupByQueryEngineV2
query.getVirtualColumns(),
query.getGranularity(),
false,
- null
+ groupByQueryMetrics
);
return cursors.flatMap(
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
index f5914a565e..2c7811c8ff 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -33,6 +33,7 @@ import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.AggregateResult;
import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper;
@@ -128,7 +129,8 @@ public class VectorGroupByEngine
@Nullable final DateTime fudgeTimestamp,
@Nullable final Filter filter,
final Interval interval,
- final GroupByQueryConfig config
+ final GroupByQueryConfig config,
+ @Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (!canVectorize(query, storageAdapter, filter)) {
@@ -147,7 +149,7 @@ public class VectorGroupByEngine
query.getVirtualColumns(),
false,
QueryContexts.getVectorSize(query),
- null
+ groupByQueryMetrics
);
if (cursor == null) {
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java
index df48c96872..87e6a3dcac 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java
@@ -27,6 +27,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
@@ -164,7 +165,7 @@ public interface GroupByStrategy
/**
* Merge a variety of single-segment query runners into a combined runner.
Used by
* {@link
org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(QueryProcessingPool,
Iterable)}. In
- * that sense, it is intended to go along with {@link #process(GroupByQuery,
StorageAdapter)} (the runners created
+ * that sense, it is intended to go along with {@link #process(GroupByQuery,
StorageAdapter, GroupByQueryMetrics)} (the runners created
* by that method will be fed into this method).
* <p>
* This method is only called on data servers, like Historicals (not the
Broker).
@@ -187,7 +188,10 @@ public interface GroupByStrategy
*
* @return result sequence for the storage adapter
*/
- Sequence<ResultRow> process(GroupByQuery query, StorageAdapter
storageAdapter);
+ Sequence<ResultRow> process(
+ GroupByQuery query,
+ StorageAdapter storageAdapter,
+ @Nullable GroupByQueryMetrics groupByQueryMetrics);
/**
* Returns whether this strategy supports pushing down outer queries. This
is used by
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java
index c25335f7ce..aeb3d2b800 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java
@@ -41,6 +41,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryEngine;
import org.apache.druid.query.groupby.GroupByQueryHelper;
+import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
@@ -51,6 +52,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
@@ -233,7 +235,8 @@ public class GroupByStrategyV1 implements GroupByStrategy
outerQuery.withQuerySegmentSpec(
new
MultipleIntervalSegmentSpec(ImmutableList.of(interval))
),
- new
IncrementalIndexStorageAdapter(innerQueryResultIndex)
+ new
IncrementalIndexStorageAdapter(innerQueryResultIndex),
+ null
);
}
}
@@ -269,10 +272,14 @@ public class GroupByStrategyV1 implements GroupByStrategy
}
@Override
- public Sequence<ResultRow> process(final GroupByQuery query, final
StorageAdapter storageAdapter)
+ public Sequence<ResultRow> process(
+ final GroupByQuery query,
+ final StorageAdapter storageAdapter,
+ @Nullable final GroupByQueryMetrics groupByQueryMetrics
+ )
{
return Sequences.map(
- engine.process(query, storageAdapter),
+ engine.process(query, storageAdapter, groupByQueryMetrics),
row -> GroupByQueryHelper.toResultRow(query, row)
);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
index f8823d62ac..abb5cdf65d 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
@@ -59,6 +59,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import
org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
@@ -73,6 +74,7 @@ import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.utils.CloseableUtils;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
@@ -690,13 +692,18 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
@Override
- public Sequence<ResultRow> process(GroupByQuery query, StorageAdapter
storageAdapter)
+ public Sequence<ResultRow> process(
+ GroupByQuery query,
+ StorageAdapter storageAdapter,
+ @Nullable GroupByQueryMetrics groupByQueryMetrics
+ )
{
return GroupByQueryEngineV2.process(
query,
storageAdapter,
bufferPool,
- configSupplier.get().withOverrides(query)
+ configSupplier.get().withOverrides(query),
+ groupByQueryMetrics
);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
index 112fdeaa1b..fccc3a2c86 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
@@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.Filter;
@@ -44,6 +45,7 @@ import org.apache.druid.segment.filter.Filters;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -61,7 +63,8 @@ public class ScanQueryEngine
public Sequence<ScanResultValue> process(
final ScanQuery query,
final Segment segment,
- final ResponseContext responseContext
+ final ResponseContext responseContext,
+ @Nullable final QueryMetrics<?> queryMetrics
)
{
if (segment.asQueryableIndex() != null &&
segment.asQueryableIndex().isFromTombstone()) {
@@ -135,7 +138,7 @@ public class ScanQueryEngine
Granularities.ALL,
query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
(query.getTimeOrder().equals(ScanQuery.Order.NONE) &&
query.isDescending()),
- null
+ queryMetrics
)
.map(cursor -> new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
diff --git
a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
index 7133241836..8aec07679b 100644
---
a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
@@ -373,7 +373,7 @@ public class ScanQueryRunnerFactory implements
QueryRunnerFactory<ScanResultValu
if (timeoutAt == null || timeoutAt == 0L) {
responseContext.putTimeoutTime(JodaUtils.MAX_INSTANT);
}
- return engine.process((ScanQuery) query, segment, responseContext);
+ return engine.process((ScanQuery) query, segment, responseContext,
queryPlus.getQueryMetrics());
}
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
index 2fe21dafbf..71a1938eda 100644
---
a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
@@ -120,7 +120,8 @@ public class TimeBoundaryQueryRunnerFactory
VirtualColumns.EMPTY,
descending,
Granularities.ALL,
- this.skipToFirstMatching
+ this.skipToFirstMatching,
+ null
);
final List<Result<DateTime>> resultList =
resultSequence.limit(1).toList();
if (resultList.size() > 0) {
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
index e93cc973c6..6981751e12 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
@@ -82,7 +82,11 @@ public class TimeseriesQueryEngine
* Run a single-segment, single-interval timeseries query on a particular
adapter. The query must have been
* scoped down to a single interval before calling this method.
*/
- public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery
query, final StorageAdapter adapter)
+ public Sequence<Result<TimeseriesResultValue>> process(
+ final TimeseriesQuery query,
+ final StorageAdapter adapter,
+ @Nullable final TimeseriesQueryMetrics timeseriesQueryMetrics
+ )
{
if (adapter == null) {
throw new SegmentMissingException(
@@ -106,9 +110,9 @@ public class TimeseriesQueryEngine
final Sequence<Result<TimeseriesResultValue>> result;
if (doVectorize) {
- result = processVectorized(query, adapter, filter, interval, gran,
descending);
+ result = processVectorized(query, adapter, filter, interval, gran,
descending, timeseriesQueryMetrics);
} else {
- result = processNonVectorized(query, adapter, filter, interval, gran,
descending);
+ result = processNonVectorized(query, adapter, filter, interval, gran,
descending, timeseriesQueryMetrics);
}
final int limit = query.getLimit();
@@ -125,7 +129,8 @@ public class TimeseriesQueryEngine
@Nullable final Filter filter,
final Interval queryInterval,
final Granularity gran,
- final boolean descending
+ final boolean descending,
+ final TimeseriesQueryMetrics timeseriesQueryMetrics
)
{
final boolean skipEmptyBuckets = query.isSkipEmptyBuckets();
@@ -137,7 +142,7 @@ public class TimeseriesQueryEngine
query.getVirtualColumns(),
descending,
QueryContexts.getVectorSize(query),
- null
+ timeseriesQueryMetrics
);
if (cursor == null) {
@@ -251,7 +256,8 @@ public class TimeseriesQueryEngine
@Nullable final Filter filter,
final Interval queryInterval,
final Granularity gran,
- final boolean descending
+ final boolean descending,
+ final TimeseriesQueryMetrics timeseriesQueryMetrics
)
{
final boolean skipEmptyBuckets = query.isSkipEmptyBuckets();
@@ -299,7 +305,8 @@ public class TimeseriesQueryEngine
agg.close();
}
}
- }
+ },
+ timeseriesQueryMetrics
);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java
index 2d04905b3b..fe3d420e56 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java
@@ -99,7 +99,7 @@ public class TimeseriesQueryRunnerFactory
throw new ISE("Got a [%s] which isn't a %s", input.getClass(),
TimeseriesQuery.class);
}
- return engine.process((TimeseriesQuery) input, adapter);
+ return engine.process((TimeseriesQuery) input, adapter,
(TimeseriesQueryMetrics) queryPlus.getQueryMetrics());
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
index fb424f8e21..9a4951cfc4 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
@@ -262,6 +262,10 @@ public class IncrementalIndexStorageAdapter implements
StorageAdapter
return Sequences.empty();
}
+ if (queryMetrics != null) {
+ queryMetrics.vectorized(false);
+ }
+
final Interval dataInterval = new Interval(getMinTime(),
gran.bucketEnd(getMaxTime()));
if (!interval.overlaps(dataInterval)) {
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
index 355a25c177..250525ca1b 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@@ -140,11 +141,12 @@ public class StringFirstTimeseriesQueryTest extends
InitializedNullHandlingTest
)
);
+ final DefaultTimeseriesQueryMetrics defaultTimeseriesQueryMetrics = new
DefaultTimeseriesQueryMetrics();
final Iterable<Result<TimeseriesResultValue>> iiResults =
- engine.process(query, new
IncrementalIndexStorageAdapter(incrementalIndex)).toList();
+ engine.process(query, new
IncrementalIndexStorageAdapter(incrementalIndex),
defaultTimeseriesQueryMetrics).toList();
final Iterable<Result<TimeseriesResultValue>> qiResults =
- engine.process(query, new
QueryableIndexStorageAdapter(queryableIndex)).toList();
+ engine.process(query, new
QueryableIndexStorageAdapter(queryableIndex),
defaultTimeseriesQueryMetrics).toList();
TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental
index");
TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable
index");
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
index 235dfdf450..6c017bab4a 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@@ -139,11 +140,12 @@ public class StringLastTimeseriesQueryTest
)
);
+ final DefaultTimeseriesQueryMetrics defaultTimeseriesQueryMetrics = new
DefaultTimeseriesQueryMetrics();
final Iterable<Result<TimeseriesResultValue>> iiResults =
- engine.process(query, new
IncrementalIndexStorageAdapter(incrementalIndex)).toList();
+ engine.process(query, new
IncrementalIndexStorageAdapter(incrementalIndex),
defaultTimeseriesQueryMetrics).toList();
final Iterable<Result<TimeseriesResultValue>> qiResults =
- engine.process(query, new
QueryableIndexStorageAdapter(queryableIndex)).toList();
+ engine.process(query, new
QueryableIndexStorageAdapter(queryableIndex),
defaultTimeseriesQueryMetrics).toList();
TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental
index");
TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable
index");
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index bd8e4905ae..23cc2cdc2b 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -50,6 +50,7 @@ import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentResultValue;
@@ -207,6 +208,7 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
private static final Closer RESOURCE_CLOSER = Closer.create();
private final QueryRunner<ResultRow> runner;
+ private final QueryRunner<ResultRow> originalRunner;
private final GroupByQueryRunnerFactory factory;
private final GroupByQueryConfig config;
private final boolean vectorize;
@@ -449,7 +451,9 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
final String testName = StringUtils.format("config=%s, runner=%s,
vectorize=%s", config, runner, vectorize);
// Add vectorization tests for any indexes that support it.
- if (!vectorize ||
QueryRunnerTestHelper.isTestRunnerVectorizable(runner)) {
+ if (!vectorize ||
+ (QueryRunnerTestHelper.isTestRunnerVectorizable(runner) &&
+
config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2))) {
constructors.add(new Object[]{testName, config, factory, runner,
vectorize});
}
}
@@ -476,6 +480,7 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
this.config = config;
this.factory = factory;
this.runner = factory.mergeRunners(Execs.directExecutor(),
ImmutableList.of(runner));
+ this.originalRunner = runner;
String runnerName = runner.toString();
this.vectorize = vectorize;
}
@@ -752,7 +757,15 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
)
);
- Iterable<ResultRow> results =
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+ StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
+ Iterable<ResultRow> results =
GroupByQueryRunnerTestHelper.runQueryWithEmitter(
+ factory,
+ originalRunner,
+ query,
+ serviceEmitter
+ );
+ Assert.assertEquals(1, serviceEmitter.getEvents().size());
+ Assert.assertEquals(vectorize,
serviceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null));
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java
index 6356a1535a..e433a27bee 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java
@@ -22,7 +22,9 @@ package org.apache.druid.query.groupby;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FinalizeResultsQueryRunner;
+import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@@ -54,6 +56,30 @@ public class GroupByQueryRunnerTestHelper
return queryResult.toList();
}
+ public static <T> Iterable<T> runQueryWithEmitter(
+ QueryRunnerFactory factory,
+ QueryRunner runner,
+ Query<T> query,
+ ServiceEmitter serviceEmitter
+ )
+ {
+ MetricsEmittingQueryRunner<ResultRow> metricsEmittingQueryRunner =
+ new MetricsEmittingQueryRunner<ResultRow>(
+ serviceEmitter,
+ factory.getToolchest(),
+ runner,
+ (obj, lng) -> {},
+ (metrics) -> {}
+ ).withWaitMeasuredFromNow();
+ QueryToolChest toolChest = factory.getToolchest();
+ QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
+
toolChest.mergeResults(toolChest.preMergeQueryDecoration(metricsEmittingQueryRunner)),
+ toolChest
+ );
+
+ return theRunner.run(QueryPlus.wrap(query)).toList();
+ }
+
public static ResultRow createExpectedRow(final GroupByQuery query, final
String timestamp, Object... vals)
{
return createExpectedRow(query, DateTimes.of(timestamp), vals);
diff --git
a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
index abd3f78f55..f0d87c9e36 100644
---
a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
@@ -36,9 +36,11 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@@ -211,7 +213,16 @@ public class ScanQueryRunnerTest extends
InitializedNullHandlingTest
.virtualColumns(EXPR_COLUMN)
.build();
- Iterable<ScanResultValue> results =
runner.run(QueryPlus.wrap(query)).toList();
+ StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
+ MetricsEmittingQueryRunner<ScanResultValue> metricsEmittingQueryRunner =
+ new MetricsEmittingQueryRunner<ScanResultValue>(
+ stubServiceEmitter,
+ TOOL_CHEST,
+ runner,
+ (obj, lng) -> {},
+ (metrics) -> {}
+ ).withWaitMeasuredFromNow();
+ Iterable<ScanResultValue> results =
metricsEmittingQueryRunner.run(QueryPlus.wrap(query)).toList();
List<ScanResultValue> expectedResults = toExpected(
toFullEvents(V_0112_0114),
@@ -219,6 +230,8 @@ public class ScanQueryRunnerTest extends
InitializedNullHandlingTest
0,
3
);
+ Assert.assertEquals(1, stubServiceEmitter.getEvents().size());
+ Assert.assertEquals(false,
stubServiceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null));
verify(expectedResults,
populateNullColumnAtLastForQueryableIndexCase(results, "null_column"));
}
diff --git
a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
index 05848a5e4d..74155ce51c 100644
---
a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
@@ -34,8 +34,10 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
+import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@@ -220,7 +222,16 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
.context(makeContext())
.build();
- Iterable<Result<TimeseriesResultValue>> results =
runner.run(QueryPlus.wrap(query)).toList();
+ StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
+ MetricsEmittingQueryRunner<Result<TimeseriesResultValue>>
metricsEmittingQueryRunner =
+ new MetricsEmittingQueryRunner<Result<TimeseriesResultValue>>(
+ stubServiceEmitter,
+ new TimeseriesQueryQueryToolChest(),
+ runner,
+ (obj, lng) -> {},
+ (metrics) -> {}
+ ).withWaitMeasuredFromNow();
+ Iterable<Result<TimeseriesResultValue>> results =
metricsEmittingQueryRunner.run(QueryPlus.wrap(query)).toList();
final String[] expectedIndex = descending ?
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES_DESC :
@@ -306,6 +317,11 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
++count;
}
+ Assert.assertEquals(1, stubServiceEmitter.getEvents().size());
+ Assert.assertEquals(
+ vectorize,
+
stubServiceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null)
+ );
Assert.assertEquals(lastResult.toString(), expectedLast,
lastResult.getTimestamp());
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
index 20812e7b88..72e1efd50e 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
@@ -158,7 +158,8 @@ public class IncrementalIndexStorageAdapterTest extends
InitializedNullHandlingT
.addDimension("sally")
.addAggregator(new LongSumAggregatorFactory("cnt",
"cnt"))
.build(),
- new IncrementalIndexStorageAdapter(index)
+ new IncrementalIndexStorageAdapter(index),
+ null
);
final List<Row> results = rows.toList();
@@ -236,7 +237,8 @@ public class IncrementalIndexStorageAdapterTest extends
InitializedNullHandlingT
)
)
.build(),
- new IncrementalIndexStorageAdapter(index)
+ new IncrementalIndexStorageAdapter(index),
+ null
);
final List<Row> results = rows.toList();
@@ -406,7 +408,8 @@ public class IncrementalIndexStorageAdapterTest extends
InitializedNullHandlingT
.addAggregator(new LongSumAggregatorFactory("cnt",
"cnt"))
.setDimFilter(DimFilters.dimEquals("sally", (String)
null))
.build(),
- new IncrementalIndexStorageAdapter(index)
+ new IncrementalIndexStorageAdapter(index),
+ null
);
final List<Row> results = rows.toList();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]