jihoonson closed pull request #5471: Implement force push down for nested group by query URL: https://github.com/apache/incubator-druid/pull/5471
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java new file mode 100644 index 00000000000..1825c6d2450 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.ClientInfoResourceTestClient; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.TestQueryHelper; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITNestedQueryPushDownTest extends AbstractIndexerTest +{ + private static final String WIKITICKER_DATA_SOURCE = "wikiticker"; + private static final String WIKITICKER_INDEX_TASK = "/indexer/wikiticker_index_task.json"; + private static final String WIKITICKER_QUERIES_RESOURCE = "/queries/nestedquerypushdown_queries.json"; + + @Inject + private CoordinatorResourceTestClient coordinatorClient; + @Inject + private TestQueryHelper queryHelper; + + private static final Logger LOG = new Logger(ITNestedQueryPushDownTest.class); + + @Inject + private IntegrationTestingConfig config; + + @Inject + ClientInfoResourceTestClient clientInfoResourceTestClient; + @Test + public void testIndexData() + { + try { + loadData(); + queryHelper.testQueriesFromFile(WIKITICKER_QUERIES_RESOURCE, 2); + } + catch (Exception e) { + LOG.error(e, "Error while testing"); + throw Throwables.propagate(e); + } + } + + private void loadData() throws Exception + { + final String taskID = indexer.submitTask(getTaskAsString(WIKITICKER_INDEX_TASK)); + LOG.info("TaskID for loading index task %s", taskID); + indexer.waitUntilTaskCompletes(taskID); + RetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(WIKITICKER_DATA_SOURCE), "Segment Load" + ); + } +} diff --git a/integration-tests/src/test/resources/indexer/wikiticker_index_task.json b/integration-tests/src/test/resources/indexer/wikiticker_index_task.json new file mode 100644 index 00000000000..5af8ae8cc24 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikiticker_index_task.json @@ -0,0 +1,84 @@ +{ + "type": "index", + "spec": { + "dataSchema": { + "dataSource": "wikiticker", + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "day", + "queryGranularity": "none", + "intervals": [ + "2015-09-12/2015-09-13" + ] + }, + "parser": { + "type": "hadoopyString", + "parseSpec": { + "format": "json", + "dimensionsSpec": { + "dimensions": [ + "channel", + "cityName", + "comment", + "countryIsoCode", + "countryName", + "isAnonymous", + "isMinor", + "isNew", + "isRobot", + "isUnpatrolled", + "metroCode", + "namespace", + "page", + "regionIsoCode", + "regionName", + "user" + ] + }, + "timestampSpec": { + "format": "auto", + "column": "time" + } + } + }, + "metricsSpec": [ + { + "name": "count", + "type": "count" + }, + { + "name": "added", + "type": "longSum", + "fieldName": "added" + }, + { + "name": "deleted", + "type": "longSum", + "fieldName": "deleted" + }, + { + "name": "delta", + "type": "longSum", + "fieldName": "delta" + }, + { + "name": "user_unique", + "type": "hyperUnique", + "fieldName": "user" + } + ] + }, + "ioConfig": { + "type": "index", + "firehose": { + "type": "local", + "baseDir": "/examples/quickstart/tutorial", + "filter": "wikiticker-2015-09-12-sampled.json.gz" + } + }, + "tuningConfig": { + "type": "index", + "targetPartitionSize" : 10000 + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json b/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json new file mode 100644 index 00000000000..4c0350c9585 --- /dev/null +++ b/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json @@ -0,0 +1,303 @@ +[ + { + "description": "Nested group by double agg query with force push down", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "query", + "query": { + "queryType": "groupBy", + "dataSource": "wikiticker", + "intervals": [ + "2015-09-12/2015-09-13" + ], + "granularity": "all", + "dimensions": [ + "channel", + "user" + ], + "metric": "added", + "aggregations": [ + { + "type": "longSum", + "name": "sumAdded", + "fieldName": "added" + } + ] + } + }, + "granularity": "all", + "dimension": "channel", + "aggregations": [ + { + "type": "longSum", + "name": "groupedSumAdded", + "fieldName": "sumAdded" + } + ], + "intervals": [ + "2015-09-12/2015-09-13" + ], + "context": { + "forcePushDownNestedQuery":"true" + } + }, + "expectedResults": [ + { + "version" : "v1", + "timestamp" : "2015-09-12T00:00:00.000Z", + "event" : { + "groupedSumAdded" : 9385573 + } + } + ] + }, + { + "description": "Nested group by query with force push down and renamed dimensions", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "query", + "query": { + "queryType": "groupBy", + "dataSource": "wikiticker", + "intervals": [ + "2015-09-12/2015-09-13" + ], + "granularity": "all", + "dimensions": [ + {"dimension" : "channel", "outputName" :"renamedChannel"}, + {"dimension" : "user", "outputName" :"renamedUser"} + ], + "metric": "added", + "aggregations": [ + { + "type": "longSum", + "name": "sumAdded", + "fieldName": "added" + } + ] + } + }, + "granularity": "all", + "dimension": "renamedChannel", + "aggregations": [ + { + "type": "longSum", + "name": "groupedSumAdded", + "fieldName": "sumAdded" + } + ], + "intervals": [ + "2015-09-12/2015-09-13" + ], + "context": { + "forcePushDownNestedQuery":"true" + } + }, + "expectedResults": [ + { + "version" : "v1", + "timestamp" : "2015-09-12T00:00:00.000Z", + "event" : { + "groupedSumAdded" : 9385573 + } + } + ] + }, + { + "description": "Nested group by query with force push down and filter on outer and inner query", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "query", + "query": { + "queryType": "groupBy", + "dataSource": "wikiticker", + "intervals": [ + "2015-09-12/2015-09-13" + ], + "granularity": "all", + "dimensions": [ + {"dimension" : "channel", "outputName" :"renamedChannel"}, + {"dimension" : "user", "outputName" :"renamedUser"} + ], + "metric": "added", + "aggregations": [ + { + "type": "longSum", + "name": "sumAdded", + "fieldName": "added" + } + ], + "filter": { + "type": "or", + "fields": [ + { + "type": "selector", + "dimension": "channel", + "value": "#zh.wikipedia" + }, + { + "type": "selector", + "dimension": "channel", + "value": "#es.wikipedia" + } + ] + } + } + }, + "granularity": "all", + "dimension": "renamedChannel", + "aggregations": [ + { + "type": "longSum", + "name": "groupedSumAdded", + "fieldName": "sumAdded" + } + ], + "intervals": [ + "2015-09-12/2015-09-13" + ], + "filter": { + "type": "and", + "fields": [ + { + "type": "selector", + "dimension": "renamedChannel", + "value": "#zh.wikipedia" + } + ] + }, + "context": { + "forcePushDownNestedQuery":"true" + } + }, + "expectedResults": [ + { + "version" : "v1", + "timestamp" : "2015-09-12T00:00:00.000Z", + "event" : { + "groupedSumAdded" : 191033 + } + } + ] + }, + { + "description": "Nested group by query with force push down and having clause", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "query", + "query": { + "queryType": "groupBy", + "dataSource": "wikiticker", + "intervals": [ + "2015-09-12/2015-09-13" + ], + "granularity": "all", + "dimensions": [ + {"dimension" : "channel"}, + {"dimension" : "user"} + ], + "metric": "added", + "aggregations": [ + { + "type": "longSum", + "name": "sumAdded", + "fieldName": "added" + } + ] + } + }, + "granularity": "all", + "aggregations": [ + { + "type": "longSum", + "name": "outerSum", + "fieldName": "sumAdded" + } + ], + "intervals": [ + "2015-09-12/2015-09-13" + ], + "having": { + "type": "or", + "havingSpecs": [ + { + "type": "greaterThan", + "aggregation": "outerSum", + "value": 9385570 + } + ] + }, + "context": { + "forcePushDownNestedQuery":"true" + } + }, + "expectedResults": [ + { + "version" : "v1", + "timestamp" : "2015-09-12T00:00:00.000Z", + "event" : { + "outerSum" : 9385573 + } + } + ] + }, + { + "description": "Nested group by query with force push down and having clause. This test asserts that the post processing was invoked.", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "query", + "query": { + "queryType": "groupBy", + "dataSource": "wikiticker", + "intervals": [ + "2015-09-12/2015-09-13" + ], + "granularity": "all", + "dimensions": [ + {"dimension" : "channel"}, + {"dimension" : "user"} + ], + "metric": "added", + "aggregations": [ + { + "type": "longSum", + "name": "sumAdded", + "fieldName": "added" + } + ] + } + }, + "granularity": "all", + "aggregations": [ + { + "type": "longSum", + "name": "outerSum", + "fieldName": "sumAdded" + } + ], + "intervals": [ + "2015-09-12/2015-09-13" + ], + "having": { + "type": "or", + "havingSpecs": [ + { + "type": "greaterThan", + "aggregation": "outerSum", + "value": 100000000 + } + ] + }, + "context": { + "forcePushDownNestedQuery":"true" + } + }, + "expectedResults": [ + ] + } +] \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/query/Query.java b/processing/src/main/java/org/apache/druid/query/Query.java index b01a10042ba..7f225c751dc 100644 --- a/processing/src/main/java/org/apache/druid/query/Query.java +++ b/processing/src/main/java/org/apache/druid/query/Query.java @@ -114,4 +114,14 @@ { return this; } + + default List<Interval> getIntervalsOfInnerMostQuery() + { + if (getDataSource() instanceof QueryDataSource) { + //noinspection unchecked + return ((QueryDataSource) getDataSource()).getQuery().getIntervalsOfInnerMostQuery(); + } else { + return getIntervals(); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/SubqueryQueryRunner.java b/processing/src/main/java/org/apache/druid/query/SubqueryQueryRunner.java index 07b64cd3a77..50dca91101c 100644 --- a/processing/src/main/java/org/apache/druid/query/SubqueryQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/SubqueryQueryRunner.java @@ -20,6 +20,8 @@ package org.apache.druid.query; import org.apache.druid.java.util.common.guava.Sequence; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.query.groupby.GroupByQueryConfig; import java.util.Map; @@ -39,10 +41,29 @@ public SubqueryQueryRunner(QueryRunner<T> baseRunner) public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext) { DataSource dataSource = queryPlus.getQuery().getDataSource(); - if (dataSource instanceof QueryDataSource) { + boolean forcePushDownNestedQuery = queryPlus.getQuery() + .getContextBoolean( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, + false + ); + if (dataSource instanceof QueryDataSource && !forcePushDownNestedQuery) { return run(queryPlus.withQuery((Query<T>) ((QueryDataSource) dataSource).getQuery()), responseContext); } else { - return baseRunner.run(queryPlus, responseContext); + QueryPlus newQuery = queryPlus; + if (forcePushDownNestedQuery) { + // Disable any more push downs before firing off the query. But do let the historical know + // that it is executing the complete nested query and not just the inner most part of it + newQuery = queryPlus.withQuery( + queryPlus.getQuery() + .withOverriddenContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false, + GroupByQueryConfig.CTX_KEY_EXECUTING_NESTED_QUERY, true + ) + ) + ); + } + return baseRunner.run(newQuery, responseContext); } } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 2a653d920a0..2ec00256095 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -29,6 +29,8 @@ public static final String CTX_KEY_STRATEGY = "groupByStrategy"; public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown"; public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown"; + public static final String CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY = "forcePushDownNestedQuery"; + public static final String CTX_KEY_EXECUTING_NESTED_QUERY = "executingNestedQuery"; private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded"; private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; private static final String CTX_KEY_MAX_RESULTS = "maxResults"; @@ -74,6 +76,9 @@ @JsonProperty private boolean forcePushDownLimit = false; + @JsonProperty + private boolean forcePushDownNestedQuery = false; + @JsonProperty private boolean forceHashAggregation = false; @@ -163,6 +168,11 @@ public int getNumParallelCombineThreads() return numParallelCombineThreads; } + public boolean isForcePushDownNestedQuery() + { + return forcePushDownNestedQuery; + } + public GroupByQueryConfig withOverrides(final GroupByQuery query) { final GroupByQueryConfig newConfig = new GroupByQueryConfig(); @@ -198,6 +208,7 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query) ); newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit()); newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation()); + newConfig.forcePushDownNestedQuery = query.getContextBoolean(CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, isForcePushDownNestedQuery()); newConfig.intermediateCombineDegree = query.getContextValue( CTX_KEY_INTERMEDIATE_COMBINE_DEGREE, getIntermediateCombineDegree() @@ -226,6 +237,7 @@ public String toString() ", forceHashAggregation=" + forceHashAggregation + ", intermediateCombineDegree=" + intermediateCombineDegree + ", numParallelCombineThreads=" + numParallelCombineThreads + + ", forcePushDownNestedQuery=" + forcePushDownNestedQuery + '}'; } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 27e77feae14..4a9a298becd 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -149,6 +149,20 @@ public GroupByQueryQueryToolChest( QueryRunner<Row> runner, Map<String, Object> context ) + { + if (isNestedQueryPushDown(query, groupByStrategy)) { + return mergeResultsWithNestedQueryPushDown(groupByStrategy, query, resource, runner, context); + } + return mergeGroupByResultsWithoutPushDown(groupByStrategy, query, resource, runner, context); + } + + private Sequence<Row> mergeGroupByResultsWithoutPushDown( + GroupByStrategy groupByStrategy, + GroupByQuery query, + GroupByQueryResource resource, + QueryRunner<Row> runner, + Map<String, Object> context + ) { // If there's a subquery, merge subquery results and then apply the aggregator @@ -192,31 +206,21 @@ public GroupByQueryQueryToolChest( context ); - final Sequence<Row> finalizingResults; - if (QueryContexts.isFinalize(subquery, false)) { - finalizingResults = new MappedSequence<>( - subqueryResult, - makePreComputeManipulatorFn( - subquery, - MetricManipulatorFns.finalizing() - )::apply - ); - } else { - finalizingResults = subqueryResult; - } + final Sequence<Row> finalizingResults = finalizeSubqueryResults(subqueryResult, subquery); if (query.getSubtotalsSpec() != null) { return groupByStrategy.processSubtotalsSpec( query, resource, - groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults) + groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults, false) ); } else { return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult( subquery, query, resource, - finalizingResults + finalizingResults, + false ), query); } @@ -233,6 +237,69 @@ public GroupByQueryQueryToolChest( } } + private Sequence<Row> mergeResultsWithNestedQueryPushDown( + GroupByStrategy groupByStrategy, + GroupByQuery query, + GroupByQueryResource resource, + QueryRunner<Row> runner, + Map<String, Object> context + ) + { + Sequence<Row> pushDownQueryResults = groupByStrategy.mergeResults(runner, query, context); + final Sequence<Row> finalizedResults = finalizeSubqueryResults(pushDownQueryResults, query); + GroupByQuery rewrittenQuery = rewriteNestedQueryForPushDown(query); + return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult( + query, + rewrittenQuery, + resource, + finalizedResults, + true + ), query); + } + + /** + * Rewrite the aggregator and dimension specs since the push down nested query will return + * results with dimension and aggregation specs of the original nested query. + */ + @VisibleForTesting + GroupByQuery rewriteNestedQueryForPushDown(GroupByQuery query) + { + return query.withAggregatorSpecs(Lists.transform(query.getAggregatorSpecs(), (agg) -> agg.getCombiningFactory())) + .withDimensionSpecs(Lists.transform( + query.getDimensions(), + (dim) -> new DefaultDimensionSpec( + dim.getOutputName(), + dim.getOutputName(), + dim.getOutputType() + ) + )); + } + + private Sequence<Row> finalizeSubqueryResults(Sequence<Row> subqueryResult, GroupByQuery subquery) + { + final Sequence<Row> finalizingResults; + if (QueryContexts.isFinalize(subquery, false)) { + finalizingResults = new MappedSequence<>( + subqueryResult, + makePreComputeManipulatorFn( + subquery, + MetricManipulatorFns.finalizing() + )::apply + ); + } else { + finalizingResults = subqueryResult; + } + return finalizingResults; + } + + public static boolean isNestedQueryPushDown(GroupByQuery q, GroupByStrategy strategy) + { + return q.getDataSource() instanceof QueryDataSource + && q.getContextBoolean(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false) + && q.getSubtotalsSpec() == null + && strategy.supportsNestedQueryPushDown(); + } + @Override public GroupByQueryMetrics makeMetrics(GroupByQuery query) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java index 53d581f9e7b..91bd4d35fe8 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -19,6 +19,7 @@ package org.apache.druid.query.groupby; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; @@ -108,4 +109,10 @@ public GroupByQueryRunner(Segment segment, final GroupByStrategySelector strateg return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter); } } + + @VisibleForTesting + public GroupByStrategySelector getStrategySelector() + { + return strategySelector; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 40ce4b8de85..171b8adca26 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -20,7 +20,6 @@ package org.apache.druid.query.groupby.epinephelinae; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.common.guava.SettableSupplier; @@ -65,7 +64,8 @@ public static Grouper createGrouper( final ObjectMapper spillMapper, final String processingTmpDir, final int mergeBufferSize, - final List<Closeable> closeOnExit + final List<Closeable> closeOnExit, + final boolean wasQueryPushedDown ) { final GroupByQuery query = (GroupByQuery) queryParam; @@ -76,50 +76,16 @@ public static Grouper createGrouper( aggregatorFactories[i] = query.getAggregatorSpecs().get(i); } - final File temporaryStorageDirectory = new File( processingTmpDir, StringUtils.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); - final List<Interval> queryIntervals = query.getIntervals(); - final Filter filter = Filters.convertToCNFFromQueryContext( - query, - Filters.toFilter(query.getDimFilter()) - ); - - final SettableSupplier<Row> rowSupplier = new SettableSupplier<>(); - final RowBasedColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create( - rowSupplier, - rowSignature - ); - final ValueMatcher filterMatcher = filter == null - ? BooleanValueMatcher.of(true) - : filter.makeMatcher(columnSelectorFactory); - - final FilteredSequence<Row> filteredSequence = new FilteredSequence<>( - rows, - new Predicate<Row>() - { - @Override - public boolean apply(Row input) - { - boolean inInterval = false; - DateTime rowTime = input.getTimestamp(); - for (Interval queryInterval : queryIntervals) { - if (queryInterval.contains(rowTime)) { - inInterval = true; - break; - } - } - if (!inInterval) { - return false; - } - rowSupplier.set(input); - return filterMatcher.matches(); - } - } - ); + Sequence<Row> sequenceToGroup = rows; + // When query is pushed down, rows have already been filtered + if (!wasQueryPushedDown) { + sequenceToGroup = getFilteredSequence(rows, rowSignature, query); + } final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( temporaryStorageDirectory, @@ -152,7 +118,8 @@ public ByteBuffer get() final Accumulator<AggregateResult, Row> accumulator = pair.rhs; closeOnExit.add(grouper); - final AggregateResult retVal = filteredSequence.accumulate(AggregateResult.ok(), accumulator); + final AggregateResult retVal = sequenceToGroup.accumulate(AggregateResult.ok(), accumulator); + if (!retVal.isOk()) { throw new ResourceLimitExceededException(retVal.getReason()); } @@ -160,6 +127,47 @@ public ByteBuffer get() return grouper; } + private static Sequence<Row> getFilteredSequence( + Sequence<Row> rows, + Map<String, ValueType> rowSignature, + GroupByQuery query + ) + { + final List<Interval> queryIntervals = query.getIntervals(); + final Filter filter = Filters.convertToCNFFromQueryContext( + query, + Filters.toFilter(query.getDimFilter()) + ); + + final SettableSupplier<Row> rowSupplier = new SettableSupplier<>(); + final RowBasedColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create( + rowSupplier, + rowSignature + ); + final ValueMatcher filterMatcher = filter == null + ? BooleanValueMatcher.of(true) + : filter.makeMatcher(columnSelectorFactory); + + return new FilteredSequence<>( + rows, + input -> { + boolean inInterval = false; + DateTime rowTime = input.getTimestamp(); + for (Interval queryInterval : queryIntervals) { + if (queryInterval.contains(rowTime)) { + inInterval = true; + break; + } + } + if (!inInterval) { + return false; + } + rowSupplier.set(input); + return filterMatcher.matches(); + } + ); + } + public static Sequence<Row> getRowsFromGrouper(GroupByQuery query, List<String> subtotalSpec, Supplier<Grouper> grouper) { return new BaseSequence<>( @@ -183,5 +191,6 @@ public void cleanup(CloseableGrouperIterator<RowBasedKey, Row> iterFromMake) } } ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java index c3e4d709a79..8588a42d6ae 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java @@ -82,7 +82,8 @@ GroupByQuery subquery, GroupByQuery query, GroupByQueryResource resource, - Sequence<Row> subqueryResult + Sequence<Row> subqueryResult, + boolean wasQueryPushedDown ); Sequence<Row> processSubtotalsSpec( @@ -100,4 +101,6 @@ GroupByQuery query, StorageAdapter storageAdapter ); + + boolean supportsNestedQueryPushDown(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java index fcc0fe410b2..cc26139dc6f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -159,7 +159,11 @@ public boolean doMergeResults(final GroupByQuery query) @Override public Sequence<Row> processSubqueryResult( - GroupByQuery subquery, GroupByQuery query, GroupByQueryResource resource, Sequence<Row> subqueryResult + GroupByQuery subquery, + GroupByQuery query, + GroupByQueryResource resource, + Sequence<Row> subqueryResult, + boolean wasQueryPushedDown ) { final Set<AggregatorFactory> aggs = Sets.newHashSet(); @@ -287,4 +291,10 @@ public boolean apply(AggregatorFactory agg) { return engine.process(query, storageAdapter); } + + @Override + public boolean supportsNestedQueryPushDown() + { + return false; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 0e3e441b138..1367fc62bbe 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -277,6 +277,16 @@ public boolean doMergeResults(final GroupByQuery query) @Override public Row apply(final Row row) { + if (query.getContextBoolean(GroupByQueryConfig.CTX_KEY_EXECUTING_NESTED_QUERY, false)) { + // When executing nested queries, we need to make sure that we are + // extracting out the event from the row. Post aggregators are not invoked since + // they should only be used after merging all the nested query responses. Timestamp + // if it needs to be fudged, it is ok to do here. + return new MapBasedRow( + fudgeTimestamp != null ? fudgeTimestamp : row.getTimestamp(), + ((MapBasedRow) row).getEvent() + ); + } // Apply postAggregators and fudgeTimestamp if present and if this is the outermost mergeResults. if (!query.getContextBoolean(CTX_KEY_OUTERMOST, true)) { @@ -323,7 +333,8 @@ public Row apply(final Row row) GroupByQuery subquery, GroupByQuery query, GroupByQueryResource resource, - Sequence<Row> subqueryResult + Sequence<Row> subqueryResult, + boolean wasQueryPushedDown ) { // This contains all closeable objects which are closed when the returned iterator iterates all the elements, @@ -335,13 +346,14 @@ public Row apply(final Row row) () -> GroupByRowProcessor.createGrouper( query, subqueryResult, - GroupByQueryHelper.rowSignatureFor(subquery), + GroupByQueryHelper.rowSignatureFor(wasQueryPushedDown ? query : subquery), configSupplier.get(), resource, spillMapper, processingConfig.getTmpDir(), processingConfig.intermediateComputeSizeBytes(), - closeOnExit + closeOnExit, + wasQueryPushedDown ) ); @@ -403,7 +415,8 @@ public Row apply(final Row row) spillMapper, processingConfig.getTmpDir(), processingConfig.intermediateComputeSizeBytes(), - closeOnExit + closeOnExit, + false ) ); List<Sequence<Row>> subtotalsResults = new ArrayList<>(subtotals.size()); @@ -476,4 +489,10 @@ public Row apply(final Row row) { return GroupByQueryEngineV2.process(query, storageAdapter, bufferPool, configSupplier.get().withOverrides(query)); } + + @Override + public boolean supportsNestedQueryPushDown() + { + return true; + } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java new file mode 100644 index 00000000000..7f21731901f --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java @@ -0,0 +1,1038 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.druid.query.groupby; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.commons.io.FileUtils; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.collections.NonBlockingPool; +import org.apache.druid.collections.StupidPool; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.js.JavaScriptConfig; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.BySegmentQueryRunner; +import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.FinalizeResultsQueryRunner; +import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.MetricManipulatorFns; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.extraction.RegexDimExtractionFn; +import org.apache.druid.query.filter.JavaScriptDimFilter; +import org.apache.druid.query.groupby.having.GreaterThanHavingSpec; +import org.apache.druid.query.groupby.strategy.GroupByStrategy; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class NestedQueryPushDownTest +{ + private static final IndexIO INDEX_IO; + private static final IndexMergerV9 INDEX_MERGER_V9; + public static final ObjectMapper JSON_MAPPER; + private File tmpDir; + private QueryRunnerFactory<Row, GroupByQuery> groupByFactory; + private QueryRunnerFactory<Row, GroupByQuery> groupByFactory2; + private List<IncrementalIndex> incrementalIndices = Lists.newArrayList(); + private List<QueryableIndex> groupByIndices = Lists.newArrayList(); + private ExecutorService executorService; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + JSON_MAPPER.setInjectableValues( + new InjectableValues.Std().addValue( + ExprMacroTable.class, + ExprMacroTable.nil() + ) + ); + INDEX_IO = new IndexIO( + JSON_MAPPER, + () -> 0 + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + } + + private IncrementalIndex makeIncIndex() + { + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withDimensionsSpec(new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dimA"), + new StringDimensionSchema("dimB"), + new LongDimensionSchema("metA"), + new LongDimensionSchema("metB") + ), + null, + null + )) + .build() + ) + .setReportParseExceptions(false) + .setConcurrentEventAdd(true) + .setMaxRowCount(1000) + .buildOnheap(); + } + + @Before + public void setup() throws Exception + + { + tmpDir = Files.createTempDir(); + + InputRow row; + List<String> dimNames = Arrays.asList("dimA", "metA", "dimB", "metB"); + Map<String, Object> event; + + final IncrementalIndex indexA = makeIncIndex(); + incrementalIndices.add(indexA); + + event = new HashMap<>(); + event.put("dimA", "pomegranate"); + event.put("metA", 1000L); + event.put("dimB", "sweet"); + event.put("metB", 10L); + row = new MapBasedInputRow(1505260888888L, dimNames, event); + indexA.add(row); + + event = new HashMap<>(); + event.put("dimA", "mango"); + event.put("metA", 1000L); + event.put("dimB", "sweet"); + event.put("metB", 20L); + row = new MapBasedInputRow(1505260800000L, dimNames, event); + indexA.add(row); + + event = new HashMap<>(); + event.put("dimA", "pomegranate"); + event.put("metA", 1000L); + event.put("dimB", "sweet"); + event.put("metB", 10L); + row = new MapBasedInputRow(1505264400000L, dimNames, event); + indexA.add(row); + + event = new HashMap<>(); + event.put("dimA", "mango"); + event.put("metA", 1000L); + event.put("dimB", "sweet"); + event.put("metB", 20L); + row = new MapBasedInputRow(1505264400400L, dimNames, event); + indexA.add(row); + + final File fileA = INDEX_MERGER_V9.persist( + indexA, + new File(tmpDir, "A"), + new IndexSpec(), + null + ); + QueryableIndex qindexA = INDEX_IO.loadIndex(fileA); + + + final IncrementalIndex indexB = makeIncIndex(); + incrementalIndices.add(indexB); + + event = new HashMap<>(); + event.put("dimA", "pomegranate"); + event.put("metA", 1000L); + event.put("dimB", "sweet"); + event.put("metB", 10L); + row = new MapBasedInputRow(1505260800000L, dimNames, event); + indexB.add(row); + + event = new HashMap<>(); + event.put("dimA", "mango"); + event.put("metA", 1000L); + event.put("dimB", "sweet"); + event.put("metB", 20L); + row = new MapBasedInputRow(1505260800000L, dimNames, event); + indexB.add(row); + + event = new HashMap<>(); + event.put("dimA", "pomegranate"); + event.put("metA", 1000L); + event.put("dimB", "sour"); + event.put("metB", 10L); + row = new MapBasedInputRow(1505264400000L, dimNames, event); + indexB.add(row); + + event = new HashMap<>(); + event.put("dimA", "mango"); + event.put("metA", 1000L); + event.put("dimB", "sour"); + event.put("metB", 20L); + row = new MapBasedInputRow(1505264400000L, dimNames, event); + indexB.add(row); + + final File fileB = INDEX_MERGER_V9.persist( + indexB, + new File(tmpDir, "B"), + new IndexSpec(), + null + ); + QueryableIndex qindexB = INDEX_IO.loadIndex(fileB); + + groupByIndices = Arrays.asList(qindexA, qindexB); + setupGroupByFactory(); + } + + + private void setupGroupByFactory() + { + executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]"); + + NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>( + "GroupByBenchmark-computeBufferPool", + new OffheapBufferGenerator("compute", 10_000_000), + 0, + Integer.MAX_VALUE + ); + + // limit of 3 is required since we simulate running historical running nested query and broker doing the final merge + BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>( + new OffheapBufferGenerator("merge", 10_000_000), + 10 + ); + // limit of 3 is required since we simulate running historical running nested query and broker doing the final merge + BlockingPool<ByteBuffer> mergePool2 = new DefaultBlockingPool<>( + new OffheapBufferGenerator("merge", 10_000_000), + 10 + ); + + final GroupByQueryConfig config = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return "v2"; + } + + @Override + public int getBufferGrouperInitialBuckets() + { + return -1; + } + + @Override + public long getMaxOnDiskStorage() + { + return 1_000_000_000L; + } + }; + config.setSingleThreaded(false); + config.setMaxIntermediateRows(Integer.MAX_VALUE); + config.setMaxResults(Integer.MAX_VALUE); + + DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig() + { + @Override + public int getNumThreads() + { + // Used by "v2" strategy for concurrencyHint + return 2; + } + + @Override + public String getFormatString() + { + return null; + } + }; + + final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + configSupplier, + new GroupByStrategyV1( + configSupplier, + new GroupByQueryEngine(configSupplier, bufferPool), + NOOP_QUERYWATCHER, + bufferPool + ), + new GroupByStrategyV2( + druidProcessingConfig, + configSupplier, + bufferPool, + mergePool, + new ObjectMapper(new SmileFactory()), + NOOP_QUERYWATCHER + ) + ); + + final GroupByStrategySelector strategySelector2 = new GroupByStrategySelector( + configSupplier, + new GroupByStrategyV1( + configSupplier, + new GroupByQueryEngine(configSupplier, bufferPool), + NOOP_QUERYWATCHER, + bufferPool + ), + new GroupByStrategyV2( + druidProcessingConfig, + configSupplier, + bufferPool, + mergePool2, + new ObjectMapper(new SmileFactory()), + NOOP_QUERYWATCHER + ) + ); + + groupByFactory = new GroupByQueryRunnerFactory( + strategySelector, + new GroupByQueryQueryToolChest( + strategySelector, + NoopIntervalChunkingQueryRunnerDecorator() + ) + ); + + groupByFactory2 = new GroupByQueryRunnerFactory( + strategySelector2, + new GroupByQueryQueryToolChest( + strategySelector2, + NoopIntervalChunkingQueryRunnerDecorator() + ) + ); + } + + @After + public void tearDown() throws Exception + { + for (IncrementalIndex incrementalIndex : incrementalIndices) { + incrementalIndex.close(); + } + + for (QueryableIndex queryableIndex : groupByIndices) { + queryableIndex.close(); + } + + if (tmpDir != null) { + FileUtils.deleteDirectory(tmpDir); + } + } + + @Test + public void testSimpleDoubleAggregation() + { + + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L)) + ); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("blah") + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimA", "dimA"), + new DefaultDimensionSpec("dimB", "dimB") + )) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("metASum", "metA"), + new LongSumAggregatorFactory("metBSum", "metB") + )) + .setGranularity(Granularities.ALL) + .build(); + + GroupByQuery nestedQuery = GroupByQuery + .builder() + .setDataSource(query) + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Collections.singletonList( + new DefaultDimensionSpec("dimB", "dimB") + )) + .setAggregatorSpecs( + Collections.singletonList(new LongSumAggregatorFactory("totalSum", "metASum")) + ) + .setContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true + ) + ) + .setGranularity(Granularities.ALL) + .build(); + + Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, Maps.newHashMap()); + List<Row> results = queryResult.toList(); + + Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( + "2017-07-14T02:40:00.000Z", + "dimB", "sour", + "totalSum", 2000L + ); + Row expectedRow1 = GroupByQueryRunnerTestHelper.createExpectedRow( + "2017-07-14T02:40:00.000Z", + "dimB", "sweet", + "totalSum", 6000L + ); + + Assert.assertEquals(2, results.size()); + Assert.assertEquals(expectedRow0, results.get(0)); + Assert.assertEquals(expectedRow1, results.get(1)); + } + + @Test + public void testNestedQueryWithRenamedDimensions() + { + + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L)) + ); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("blah") + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimA", "dimA"), + new DefaultDimensionSpec("dimB", "newDimB") + )) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("metASum", "metA"), + new LongSumAggregatorFactory("metBSum", "metB") + )) + .setGranularity(Granularities.ALL) + .build(); + + GroupByQuery nestedQuery = GroupByQuery + .builder() + .setDataSource(query) + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Collections.singletonList( + new DefaultDimensionSpec("newDimB", "renamedDimB") + )) + .setAggregatorSpecs( + Collections.singletonList(new LongMaxAggregatorFactory("maxBSum", "metBSum")) + ) + .setContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true + ) + ) + .setGranularity(Granularities.ALL) + .build(); + + Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, Maps.newHashMap()); + List<Row> results = queryResult.toList(); + + Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( + "2017-07-14T02:40:00.000Z", + "renamedDimB", "sour", + "maxBSum", 20L + ); + Row expectedRow1 = GroupByQueryRunnerTestHelper.createExpectedRow( + "2017-07-14T02:40:00.000Z", + "renamedDimB", "sweet", + "maxBSum", 60L + ); + Assert.assertEquals(2, results.size()); + Assert.assertEquals(expectedRow0, results.get(0)); + Assert.assertEquals(expectedRow1, results.get(1)); + } + + @Test + public void testDimensionFilterOnOuterAndInnerQueries() + { + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L)) + ); + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("blah") + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimA", "dimA"), + new DefaultDimensionSpec("dimB", "dimB") + )) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("metASum", "metA"), + new LongSumAggregatorFactory("metBSum", "metB") + )) + .setGranularity(Granularities.ALL) + .setQuerySegmentSpec(intervalSpec) + .setDimFilter(new JavaScriptDimFilter( + "dimA", + "function(dim){ return dim == 'mango' }", + null, + JavaScriptConfig.getEnabledInstance() + )) + .build(); + + GroupByQuery nestedQuery = GroupByQuery + .builder() + .setDataSource(query) + .setDimensions(Collections.singletonList( + new DefaultDimensionSpec("dimA", "newDimA") + )) + .setAggregatorSpecs( + Collections.singletonList(new LongSumAggregatorFactory("finalSum", "metASum")) + ) + .setContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true + ) + ) + .setGranularity(Granularities.ALL) + .setDimFilter(new JavaScriptDimFilter( + "dimA", + "function(dim){ return dim == 'pomegranate' }", + null, + JavaScriptConfig.getEnabledInstance() + )) + .setQuerySegmentSpec(intervalSpec) + .build(); + + Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, Maps.newHashMap()); + List<Row> results = queryResult.toList(); + + Assert.assertEquals(0, results.size()); + } + + @Test + public void testDimensionFilterOnOuterQuery() + { + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L)) + ); + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("blah") + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimA", "dimA"), + new DefaultDimensionSpec("dimB", "dimB") + )) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("metASum", "metA"), + new LongSumAggregatorFactory("metBSum", "metB") + )) + .setGranularity(Granularities.ALL) + .setQuerySegmentSpec(intervalSpec) + .build(); + + GroupByQuery nestedQuery = GroupByQuery + .builder() + .setDataSource(query) + .setDimensions(Collections.singletonList( + new DefaultDimensionSpec("dimA", "newDimA") + )) + .setAggregatorSpecs( + Collections.singletonList(new LongSumAggregatorFactory("finalSum", "metASum")) + ) + .setContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true + ) + ) + .setGranularity(Granularities.ALL) + .setDimFilter(new JavaScriptDimFilter( + "dimA", + "function(dim){ return dim == 'mango' }", + null, + JavaScriptConfig.getEnabledInstance() + )) + .setQuerySegmentSpec(intervalSpec) + .build(); + + Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( + "2017-07-14T02:40:00.000Z", + "finalSum", 4000L, + "newDimA", "mango" + ); + Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, Maps.newHashMap()); + List<Row> results = queryResult.toList(); + + Assert.assertEquals(1, results.size()); + Assert.assertEquals(expectedRow0, results.get(0)); + } + + @Test + public void testDimensionFilterOnInnerQuery() + { + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L)) + ); + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("blah") + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimA", "dimA"), + new DefaultDimensionSpec("dimB", "dimB") + )) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("metASum", "metA"), + new LongSumAggregatorFactory("metBSum", "metB") + )) + .setGranularity(Granularities.ALL) + .setQuerySegmentSpec(intervalSpec) + .setDimFilter(new JavaScriptDimFilter( + "dimA", + "function(dim){ return dim == 'mango' }", + null, + JavaScriptConfig.getEnabledInstance() + )) + .build(); + + GroupByQuery nestedQuery = GroupByQuery + .builder() + .setDataSource(query) + .setDimensions(Collections.singletonList( + new DefaultDimensionSpec("dimA", "newDimA") + )) + .setAggregatorSpecs( + Collections.singletonList(new LongSumAggregatorFactory("finalSum", "metASum")) + ) + .setContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true + ) + ) + .setGranularity(Granularities.ALL) + .setQuerySegmentSpec(intervalSpec) + .build(); + + Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( + "2017-07-14T02:40:00.000Z", + "finalSum", 4000L, + "newDimA", "mango" + ); + Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, Maps.newHashMap()); + List<Row> results = queryResult.toList(); + + Assert.assertEquals(1, results.size()); + Assert.assertEquals(expectedRow0, results.get(0)); + } + + @Test + public void testSubqueryWithExtractionFnInOuterQuery() + { + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L)) + ); + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("blah") + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimA", "dimA"), + new DefaultDimensionSpec("dimB", "dimB") + )) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("metASum", "metA"), + new LongSumAggregatorFactory("metBSum", "metB") + )) + .setGranularity(Granularities.ALL) + .setQuerySegmentSpec(intervalSpec) + .build(); + + GroupByQuery nestedQuery = GroupByQuery + .builder() + .setDataSource(query) + .setDimensions(Collections.singletonList( + new DefaultDimensionSpec("dimA", "newDimA") + )) + .setDimensions(Collections.singletonList( + new ExtractionDimensionSpec( + "dimA", + "extractedDimA", + new RegexDimExtractionFn("^(p)", true, "replacement") + ) + ) + ) + .setAggregatorSpecs( + Collections.singletonList(new LongSumAggregatorFactory("finalSum", "metASum")) + ) + .setContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true + ) + ) + .setGranularity(Granularities.ALL) + .setQuerySegmentSpec(intervalSpec) + .build(); + + Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( + "2017-07-14T02:40:00.000Z", + "finalSum", 4000L, + "extractedDimA", "p" + ); + Row expectedRow1 = GroupByQueryRunnerTestHelper.createExpectedRow( + "2017-07-14T02:40:00.000Z", + "finalSum", 4000L, + "extractedDimA", "replacement" + ); + Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, Maps.newHashMap()); + List<Row> results = queryResult.toList(); + + Assert.assertEquals(2, results.size()); + Assert.assertEquals(expectedRow0, results.get(0)); + Assert.assertEquals(expectedRow1, results.get(1)); + } + + @Test + public void testHavingClauseInNestedPushDownQuery() + { + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L)) + ); + GroupByQuery innerQuery = GroupByQuery + .builder() + .setDataSource("blah") + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimA", "dimA"), + new DefaultDimensionSpec("dimB", "dimB") + )) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("metASum", "metA"), + new LongSumAggregatorFactory("metBSum", "metB") + )) + .setGranularity(Granularities.ALL) + .setQuerySegmentSpec(intervalSpec) + .build(); + + GroupByQuery nestedQuery = GroupByQuery + .builder() + .setDataSource(innerQuery) + .setDimensions(Collections.singletonList( + new DefaultDimensionSpec("dimB", "dimB") + )) + .setAggregatorSpecs( + Collections.singletonList(new LongSumAggregatorFactory("finalSum", "metBSum")) + ) + .setHavingSpec(new GreaterThanHavingSpec("finalSum", 70L)) + .setContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true + ) + ) + .setGranularity(Granularities.ALL) + .setQuerySegmentSpec(intervalSpec) + .build(); + + Row expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( + "2017-07-14T02:40:00.000Z", + "dimB", "sweet", + "finalSum", 90L + ); + Sequence<Row> queryResult = runNestedQueryWithForcePushDown(nestedQuery, Maps.newHashMap()); + List<Row> results = queryResult.toList(); + + Assert.assertEquals(1, results.size()); + Assert.assertEquals(expectedRow0, results.get(0)); + } + + private Sequence<Row> runNestedQueryWithForcePushDown(GroupByQuery nestedQuery, Map<String, Object> context) + { + QueryToolChest<Row, GroupByQuery> toolChest = groupByFactory.getToolchest(); + GroupByQuery pushDownQuery = nestedQuery; + QueryRunner<Row> segment1Runner = new FinalizeResultsQueryRunner<Row>( + toolChest.mergeResults( + groupByFactory.mergeRunners(executorService, getQueryRunnerForSegment1()) + ), + (QueryToolChest) toolChest + ); + + QueryRunner<Row> segment2Runner = new FinalizeResultsQueryRunner<Row>( + toolChest.mergeResults( + groupByFactory2.mergeRunners(executorService, getQueryRunnerForSegment2()) + ), + (QueryToolChest) toolChest + ); + + QueryRunner<Row> queryRunnerForSegments = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + (queryPlus, responseContext) -> Sequences + .simple( + ImmutableList.of( + Sequences.map( + segment1Runner.run(queryPlus, responseContext), + toolChest.makePreComputeManipulatorFn( + (GroupByQuery) queryPlus.getQuery(), + MetricManipulatorFns.deserializing() + ) + ), + Sequences.map( + segment2Runner.run(queryPlus, responseContext), + toolChest.makePreComputeManipulatorFn( + (GroupByQuery) queryPlus.getQuery(), + MetricManipulatorFns.deserializing() + ) + ) + ) + ) + .flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering()) + ), + (QueryToolChest) toolChest + ); + GroupByStrategy strategy = ((GroupByQueryRunnerFactory) groupByFactory).getStrategySelector() + .strategize(nestedQuery); + // Historicals execute the query with force push down flag as false + GroupByQuery queryWithPushDownDisabled = pushDownQuery.withOverriddenContext(ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, + false + )); + Sequence<Row> pushDownQueryResults = strategy.mergeResults( + queryRunnerForSegments, + queryWithPushDownDisabled, + context + ); + + return toolChest.mergeResults((queryPlus, responseContext) -> pushDownQueryResults) + .run(QueryPlus.wrap(nestedQuery), context); + } + + @Test + public void testQueryRewriteForPushDown() + { + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L)) + ); + + String outputNameB = "dimBOutput"; + String outputNameAgg = "totalSum"; + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("blah") + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("dimA", "dimA"), + new DefaultDimensionSpec("dimB", "dimB") + )) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("metASum", "metA"), + new LongSumAggregatorFactory("metBSum", "metB") + )) + .setGranularity(Granularities.ALL) + .build(); + + GroupByQuery nestedQuery = GroupByQuery + .builder() + .setDataSource(query) + .setQuerySegmentSpec(intervalSpec) + .setDimensions(Collections.singletonList( + new DefaultDimensionSpec("dimB", outputNameB) + )) + .setAggregatorSpecs( + Collections.singletonList(new LongSumAggregatorFactory(outputNameAgg, "metASum")) + ) + .setContext( + ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true + ) + ) + .setGranularity(Granularities.ALL) + .build(); + QueryToolChest<Row, GroupByQuery> toolChest = groupByFactory.getToolchest(); + GroupByQuery rewrittenQuery = ((GroupByQueryQueryToolChest) toolChest).rewriteNestedQueryForPushDown(nestedQuery); + Assert.assertEquals(outputNameB, rewrittenQuery.getDimensions().get(0).getDimension()); + Assert.assertEquals(outputNameAgg, rewrittenQuery.getAggregatorSpecs().get(0).getName()); + } + + private List<QueryRunner<Row>> getRunner1() + { + List<QueryRunner<Row>> runners = Lists.newArrayList(); + QueryableIndex index = groupByIndices.get(0); + QueryRunner<Row> runner = makeQueryRunner( + groupByFactory, + index.toString(), + new QueryableIndexSegment(index.toString(), index) + ); + runners.add(groupByFactory.getToolchest().preMergeQueryDecoration(runner)); + return runners; + } + + private List<QueryRunner<Row>> getRunner2() + { + List<QueryRunner<Row>> runners = Lists.newArrayList(); + QueryableIndex index2 = groupByIndices.get(1); + QueryRunner<Row> tooSmallRunner = makeQueryRunner( + groupByFactory2, + index2.toString(), + new QueryableIndexSegment(index2.toString(), index2) + ); + runners.add(groupByFactory2.getToolchest().preMergeQueryDecoration(tooSmallRunner)); + return runners; + } + + public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner( + QueryRunnerFactory<T, QueryType> factory, + String segmentId, + Segment adapter + ) + { + return new FinalizeResultsQueryRunner<T>( + new BySegmentQueryRunner<T>( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) + ), + (QueryToolChest<T, Query<T>>) factory.getToolchest() + ); + } + + + private List<QueryRunner<Row>> getQueryRunnerForSegment1() + { + List<QueryRunner<Row>> runners = Lists.newArrayList(); + QueryableIndex index = groupByIndices.get(0); + QueryRunner<Row> runner = makeQueryRunnerForSegment( + groupByFactory, + index.toString(), + new QueryableIndexSegment(index.toString(), index) + ); + runners.add(groupByFactory.getToolchest().preMergeQueryDecoration(runner)); + return runners; + } + + private List<QueryRunner<Row>> getQueryRunnerForSegment2() + { + List<QueryRunner<Row>> runners = Lists.newArrayList(); + QueryableIndex index2 = groupByIndices.get(1); + QueryRunner<Row> tooSmallRunner = makeQueryRunnerForSegment( + groupByFactory2, + index2.toString(), + new QueryableIndexSegment(index2.toString(), index2) + ); + runners.add(groupByFactory2.getToolchest().preMergeQueryDecoration(tooSmallRunner)); + return runners; + } + + private static class OffheapBufferGenerator implements Supplier<ByteBuffer> + { + private static final Logger log = new Logger(OffheapBufferGenerator.class); + + private final String description; + private final int computationBufferSize; + private final AtomicLong count = new AtomicLong(0); + + public OffheapBufferGenerator(String description, int computationBufferSize) + { + this.description = description; + this.computationBufferSize = computationBufferSize; + } + + @Override + public ByteBuffer get() + { + log.info( + "Allocating new %s buffer[%,d] of size[%,d]", + description, + count.getAndIncrement(), + computationBufferSize + ); + + return ByteBuffer.allocateDirect(computationBufferSize); + } + } + + public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunnerForSegment( + QueryRunnerFactory<T, QueryType> factory, + String segmentId, + Segment adapter + ) + { + return new FinalizeResultsQueryRunner<>( + new BySegmentQueryRunner<>( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) + ), + (QueryToolChest<T, Query<T>>) factory.getToolchest() + ); + } + + public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; + + public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() + { + return new IntervalChunkingQueryRunnerDecorator(null, null, null) + { + @Override + public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest) + { + return (queryPlus, responseContext) -> delegate.run(queryPlus, responseContext); + } + }; + } +} diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 99df5f0a06f..3ec8b2db2dd 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -217,6 +217,7 @@ public CachingClusteredClient( private final int uncoveredIntervalsLimit; private final Query<T> downstreamQuery; private final Map<String, Cache.NamedKey> cachePopulatorKeyMap = Maps.newHashMap(); + private final List<Interval> intervals; SpecificQueryRunnable(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext) { @@ -233,6 +234,8 @@ public CachingClusteredClient( // and might blow up in some cases https://github.com/apache/incubator-druid/issues/2108 this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); this.downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext()); + // For nested queries, we need to look at the intervals of the inner most query. + this.intervals = query.getIntervalsOfInnerMostQuery(); } private ImmutableMap<String, Object> makeDownstreamQueryContext() @@ -291,7 +294,7 @@ public CachingClusteredClient( { final List<TimelineObjectHolder<String, ServerSelector>> serversLookup = toolChest.filterSegments( query, - query.getIntervals().stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList()) + intervals.stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList()) ); final Set<ServerToSegment> segments = Sets.newLinkedHashSet(); @@ -322,7 +325,7 @@ private void computeUncoveredIntervals(TimelineLookup<String, ServerSelector> ti final List<Interval> uncoveredIntervals = new ArrayList<>(uncoveredIntervalsLimit); boolean uncoveredIntervalsOverflowed = false; - for (Interval interval : query.getIntervals()) { + for (Interval interval : intervals) { Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval); long startMillis = interval.getStartMillis(); long endMillis = interval.getEndMillis(); diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 8e483479ae0..93a5aa2aeb6 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -42,6 +42,7 @@ import org.apache.druid.query.PerSegmentOptimizingQueryRunner; import org.apache.druid.query.PerSegmentQueryOptimizationContext; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -110,6 +111,14 @@ public ServerManager( this.serverConfig = serverConfig; } + private DataSource getInnerMostDataSource(DataSource dataSource) + { + if (dataSource instanceof QueryDataSource) { + return getInnerMostDataSource(((QueryDataSource) dataSource).getQuery().getDataSource()); + } + return dataSource; + } + @Override public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) { @@ -121,7 +130,7 @@ public ServerManager( final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest(); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); - DataSource dataSource = query.getDataSource(); + DataSource dataSource = getInnerMostDataSource(query.getDataSource()); if (!(dataSource instanceof TableDataSource)) { throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
