gianm commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1546905734
##########
processing/src/main/java/org/apache/druid/query/FluentQueryRunner.java:
##########
@@ -90,9 +90,9 @@ public FluentQueryRunner<T>
postProcess(PostProcessingOperator<T> postProcessing
return from(postProcessing != null ?
postProcessing.postProcess(baseRunner) : baseRunner);
}
- public FluentQueryRunner<T> mergeResults()
+ public FluentQueryRunner<T> mergeResults(boolean willMergeRunner)
Review Comment:
Please add javadoc for what `willMergeRunner` means. I recognize most other
stuff in here doesn't have javadocs, but, still.
##########
server/src/main/java/org/apache/druid/server/ResourceIdPopulatingQueryRunner.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+
+/**
+ * Populates {@link org.apache.druid.query.QueryContexts#QUERY_RESOURCE_ID} in
the query context
+ */
+public class ResourceIdPopulatingQueryRunner<T> implements QueryRunner<T>
+{
+ private final QueryRunner<T> baseRunner;
+
+ public ResourceIdPopulatingQueryRunner(QueryRunner<T> baseRunner)
+ {
+ this.baseRunner = baseRunner;
+ }
+
+ @Override
+ public Sequence<T> run(
+ final QueryPlus<T> queryPlus,
+ final ResponseContext responseContext
+ )
+ {
+ return baseRunner.run(
+ queryPlus.withQuery(
+ ClientQuerySegmentWalker.populateResourceId(queryPlus.getQuery())
Review Comment:
Would make more sense to me if `populateResourceId` was here, rather than in
`ClientQuerySegmentWalker`.
##########
processing/src/main/java/org/apache/druid/query/QueryToolChest.java:
##########
@@ -118,6 +118,22 @@ public QueryRunner<ResultType>
mergeResults(QueryRunner<ResultType> runner)
return new ResultMergeQueryRunner<>(runner, this::createResultComparator,
this::createMergeFn);
}
+ /**
+ * Like {@link #mergeResults(QueryRunner)}, but additional context parameter
to determine whether the input runner
+ * to the method would be the result from the corresponding {@link
QueryRunnerFactory#mergeRunners}. Merging can
+ * require additional resources, like merge buffers for group-by queries,
therefore the flag, can help
+ * determine if the mergeResults should acquire those resources for the
merging runners, before beginning execution.
+ * If not overridden, this method will ignore the {@code willMergeRunner}
parameter.
+ *
+ * Ideally {@link #mergeResults(QueryRunner)} should have delegated to this
method after setting the default value of
Review Comment:
Couple questions about this:
- what should the "default value of `willMergeRunner`" be? It seems like a
bunch of places use `true`, but why is that?
- what should _new_ toolchests do? Is it ok to override just the new 2-arg
`mergeResults` call, or do both need to be overridden?
I'm hoping we can make this `willMergeRunner` piece more clear, since IMO
it's the main unclear thing left in the patch after the last round of changes.
##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java:
##########
@@ -102,35 +104,53 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
private final GroupingEngine groupingEngine;
private final GroupByQueryConfig queryConfig;
private final GroupByQueryMetricsFactory queryMetricsFactory;
+ private final GroupByResourcesReservationPool
groupByResourcesReservationPool;
@VisibleForTesting
- public GroupByQueryQueryToolChest(GroupingEngine groupingEngine)
+ public GroupByQueryQueryToolChest(
+ GroupingEngine groupingEngine,
+ GroupByResourcesReservationPool groupByResourcesReservationPool
+ )
{
- this(groupingEngine, GroupByQueryConfig::new,
DefaultGroupByQueryMetricsFactory.instance());
+ this(
+ groupingEngine,
+ GroupByQueryConfig::new,
+ DefaultGroupByQueryMetricsFactory.instance(),
+ groupByResourcesReservationPool
+ );
}
@Inject
public GroupByQueryQueryToolChest(
GroupingEngine groupingEngine,
Supplier<GroupByQueryConfig> queryConfigSupplier,
- GroupByQueryMetricsFactory queryMetricsFactory
+ GroupByQueryMetricsFactory queryMetricsFactory,
+ @Merging GroupByResourcesReservationPool groupByResourcesReservationPool
Review Comment:
As I understand it— there is no reason to use `@Merging` here, since there's
only one kind of `GroupByResourcesReservationPool`. (The annotations are used
to disambiguate when there's multiple kinds of some injectable key.)
##########
processing/src/main/java/org/apache/druid/query/QueryContext.java:
##########
@@ -591,6 +591,10 @@ public boolean isWindowingStrictValidation()
);
}
+ public String getQueryResourceId()
Review Comment:
IMO it'd be good to make this a wrapper around String like `ResourceId`.
That provides us a central place to put some javadocs that explain how
resources work, and link with `@link` and `@see` to other relevant files. It
also makes it easier to find usages in an IDE.
##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Merging;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Reserves the {@link GroupByQueryResources} for a given group by query and
maps them to the query's resource ID.
+ */
+public class GroupByResourcesReservationPool
+{
+ /**
+ * Map of query's resource id -> group by resources reserved for the query
to execute
+ */
+ final ConcurrentHashMap<String, GroupByQueryResources> pool = new
ConcurrentHashMap<>();
Review Comment:
Use `ResourceId` as key once it exists
##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Merging;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Reserves the {@link GroupByQueryResources} for a given group by query and
maps them to the query's resource ID.
+ */
+public class GroupByResourcesReservationPool
+{
+ /**
+ * Map of query's resource id -> group by resources reserved for the query
to execute
+ */
+ final ConcurrentHashMap<String, GroupByQueryResources> pool = new
ConcurrentHashMap<>();
+
+ /**
+ * Buffer pool from where the merge buffers are picked and reserved
+ */
+ final BlockingPool<ByteBuffer> mergeBufferPool;
+
+ /**
+ * Group by query config of the server
+ */
+ final GroupByQueryConfig groupByQueryConfig;
+
+ @Inject
+ public GroupByResourcesReservationPool(
+ @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+ GroupByQueryConfig groupByQueryConfig
+ )
+ {
+ this.mergeBufferPool = mergeBufferPool;
+ this.groupByQueryConfig = groupByQueryConfig;
+ }
+
+ /**
+ * Reserves appropariate resources, and maps it to the queryResourceId
(usually the query's resource id) in the internal map
Review Comment:
appropriate (spelling)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]