gianm commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1546905734


##########
processing/src/main/java/org/apache/druid/query/FluentQueryRunner.java:
##########
@@ -90,9 +90,9 @@ public FluentQueryRunner<T> 
postProcess(PostProcessingOperator<T> postProcessing
     return from(postProcessing != null ? 
postProcessing.postProcess(baseRunner) : baseRunner);
   }
 
-  public FluentQueryRunner<T> mergeResults()
+  public FluentQueryRunner<T> mergeResults(boolean willMergeRunner)

Review Comment:
   Please add javadoc for what `willMergeRunner` means. I recognize most other 
stuff in here doesn't have javadocs, but, still.



##########
server/src/main/java/org/apache/druid/server/ResourceIdPopulatingQueryRunner.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+
+/**
+ * Populates {@link org.apache.druid.query.QueryContexts#QUERY_RESOURCE_ID} in 
the query context
+ */
+public class ResourceIdPopulatingQueryRunner<T> implements QueryRunner<T>
+{
+  private final QueryRunner<T> baseRunner;
+
+  public ResourceIdPopulatingQueryRunner(QueryRunner<T> baseRunner)
+  {
+    this.baseRunner = baseRunner;
+  }
+
+  @Override
+  public Sequence<T> run(
+      final QueryPlus<T> queryPlus,
+      final ResponseContext responseContext
+  )
+  {
+    return baseRunner.run(
+        queryPlus.withQuery(
+            ClientQuerySegmentWalker.populateResourceId(queryPlus.getQuery())

Review Comment:
   Would make more sense to me if `populateResourceId` was here, rather than in 
`ClientQuerySegmentWalker`.



##########
processing/src/main/java/org/apache/druid/query/QueryToolChest.java:
##########
@@ -118,6 +118,22 @@ public QueryRunner<ResultType> 
mergeResults(QueryRunner<ResultType> runner)
     return new ResultMergeQueryRunner<>(runner, this::createResultComparator, 
this::createMergeFn);
   }
 
+  /**
+   * Like {@link #mergeResults(QueryRunner)}, but additional context parameter 
to determine whether the input runner
+   * to the method would be the result from the corresponding {@link 
QueryRunnerFactory#mergeRunners}. Merging can
+   * require additional resources, like merge buffers for group-by queries, 
therefore the flag, can help
+   * determine if the mergeResults should acquire those resources for the 
merging runners, before beginning execution.
+   * If not overridden, this method will ignore the {@code willMergeRunner} 
parameter.
+   *
+   * Ideally {@link #mergeResults(QueryRunner)} should have delegated to this 
method after setting the default value of

Review Comment:
   Couple questions about this:
   
   - what should the "default value of `willMergeRunner`" be? It seems like a 
bunch of places use `true`, but why is that?
   - what should _new_ toolchests do? Is it ok to override just the new 2-arg 
`mergeResults` call, or do both need to be overridden?
   
   I'm hoping we can make this `willMergeRunner` piece more clear, since IMO 
it's the main unclear thing left in the patch after the last round of changes.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java:
##########
@@ -102,35 +104,53 @@ public class GroupByQueryQueryToolChest extends 
QueryToolChest<ResultRow, GroupB
   private final GroupingEngine groupingEngine;
   private final GroupByQueryConfig queryConfig;
   private final GroupByQueryMetricsFactory queryMetricsFactory;
+  private final GroupByResourcesReservationPool 
groupByResourcesReservationPool;
 
   @VisibleForTesting
-  public GroupByQueryQueryToolChest(GroupingEngine groupingEngine)
+  public GroupByQueryQueryToolChest(
+      GroupingEngine groupingEngine,
+      GroupByResourcesReservationPool groupByResourcesReservationPool
+  )
   {
-    this(groupingEngine, GroupByQueryConfig::new, 
DefaultGroupByQueryMetricsFactory.instance());
+    this(
+        groupingEngine,
+        GroupByQueryConfig::new,
+        DefaultGroupByQueryMetricsFactory.instance(),
+        groupByResourcesReservationPool
+    );
   }
 
   @Inject
   public GroupByQueryQueryToolChest(
       GroupingEngine groupingEngine,
       Supplier<GroupByQueryConfig> queryConfigSupplier,
-      GroupByQueryMetricsFactory queryMetricsFactory
+      GroupByQueryMetricsFactory queryMetricsFactory,
+      @Merging GroupByResourcesReservationPool groupByResourcesReservationPool

Review Comment:
   As I understand it— there is no reason to use `@Merging` here, since there's 
only one kind of `GroupByResourcesReservationPool`. (The annotations are used 
to disambiguate when there's multiple kinds of some injectable key.)



##########
processing/src/main/java/org/apache/druid/query/QueryContext.java:
##########
@@ -591,6 +591,10 @@ public boolean isWindowingStrictValidation()
     );
   }
 
+  public String getQueryResourceId()

Review Comment:
   IMO it'd be good to make this a wrapper around String like `ResourceId`. 
That provides us a central place to put some javadocs that explain how 
resources work, and link with `@link` and `@see` to other relevant files. It 
also makes it easier to find usages in an IDE.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Merging;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Reserves the {@link GroupByQueryResources} for a given group by query and 
maps them to the query's resource ID.
+ */
+public class GroupByResourcesReservationPool
+{
+  /**
+   * Map of query's resource id -> group by resources reserved for the query 
to execute
+   */
+  final ConcurrentHashMap<String, GroupByQueryResources> pool = new 
ConcurrentHashMap<>();

Review Comment:
   Use `ResourceId` as key once it exists



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Merging;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Reserves the {@link GroupByQueryResources} for a given group by query and 
maps them to the query's resource ID.
+ */
+public class GroupByResourcesReservationPool
+{
+  /**
+   * Map of query's resource id -> group by resources reserved for the query 
to execute
+   */
+  final ConcurrentHashMap<String, GroupByQueryResources> pool = new 
ConcurrentHashMap<>();
+
+  /**
+   * Buffer pool from where the merge buffers are picked and reserved
+   */
+  final BlockingPool<ByteBuffer> mergeBufferPool;
+
+  /**
+   * Group by query config of the server
+   */
+  final GroupByQueryConfig groupByQueryConfig;
+
+  @Inject
+  public GroupByResourcesReservationPool(
+      @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+      GroupByQueryConfig groupByQueryConfig
+  )
+  {
+    this.mergeBufferPool = mergeBufferPool;
+    this.groupByQueryConfig = groupByQueryConfig;
+  }
+
+  /**
+   * Reserves appropariate resources, and maps it to the queryResourceId 
(usually the query's resource id) in the internal map

Review Comment:
   appropriate (spelling)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to