[
https://issues.apache.org/jira/browse/DRILL-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16193032#comment-16193032
]
ASF GitHub Bot commented on DRILL-5716:
---------------------------------------
Github user arina-ielchiieva commented on a diff in the pull request:
https://github.com/apache/drill/pull/928#discussion_r142726899
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.work.foreman.Foreman;
+import
org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.StatusAdapter;
+
+/**
+ * Wrapper around the default and/or distributed resource managers
+ * to allow dynamically enabling and disabling queueing.
+ */
+
+public class DynamicResourceManager implements ResourceManager {
+
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(DynamicResourceManager.class);
+
+ private final DrillbitContext context;
+ private ResourceManager defaultRm;
+ private ResourceManager queueingRm;
+ private ResourceManager activeRm;
+ public long lastUpdateTime;
+ public int recheckDelayMs = 5000;
+
+ public DynamicResourceManager(final DrillbitContext context) {
+ this.context = context;
+ refreshRM();
+ }
+
+ public synchronized ResourceManager activeRM() {
+ refreshRM();
+ return activeRm;
+ }
+
+ @Override
+ public long memoryPerNode() {
+ return activeRm.memoryPerNode();
+ }
+
+ @Override
+ public int cpusPerNode() {
+ return activeRm.cpusPerNode();
+ }
+
+ @Override
+ public synchronized QueryResourceAllocator
newResourceAllocator(QueryContext queryContext) {
+ refreshRM();
+ return activeRm.newResourceAllocator(queryContext);
+ }
+
+ @Override
+ public synchronized QueryResourceManager newQueryRM(Foreman foreman) {
+ refreshRM();
+ return activeRm.newQueryRM(foreman);
+ }
+
+ private void refreshRM() {
+ long now = System.currentTimeMillis();
+ if (lastUpdateTime + recheckDelayMs >= now) {
+ return;
+ }
+ lastUpdateTime = now;
+ @SuppressWarnings("resource")
+ SystemOptionManager systemOptions = context.getOptionManager();
+ if (systemOptions.getOption(ExecConstants.ENABLE_QUEUE)) {
+ if (queueingRm == null) {
+ StatusAdapter statusAdapter = new StatusAdapter() {
+ @Override
+ public boolean inShutDown() {
+ // Drill provides no shutdown state at present. Once
+ // DRILL-4286 (graceful shutdown) is merged, use the
+ // new Drillbit status to determine when the Drillbit
+ // is shutting down.
+ return false;
+ }
+ };
+ queueingRm = new ThrottledResourceManager(context,
+ new DistributedQueryQueue(context, statusAdapter));
+ }
+ if (activeRm != queueingRm) {
+ logger.debug("Enabling ZK-based query queue.");
+ activeRm = queueingRm;
+ }
+ } else {
+ if (defaultRm == null) {
+ defaultRm = new DefaultResourceManager();
+ }
+ if (activeRm != defaultRm) {
+ logger.debug("Disabling ZK-based query queue.");
+ activeRm = defaultRm;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (defaultRm != null) {
+ defaultRm.close();
+ defaultRm = null;
+ }
--- End diff --
What if `defaultRM` closing fails, is it ok that `queueingRm` remains
unclosed?
> Queue-based memory assignment for buffering operators
> -----------------------------------------------------
>
> Key: DRILL-5716
> URL: https://issues.apache.org/jira/browse/DRILL-5716
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.11.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Apache Drill already has a queueing feature based on ZK semaphores. We did a
> bit of testing to show that the feature does, in fact work. We propose to
> enhance the feature with some light revisions to make work with the "managed"
> external sort and the newly-added spilling feature for the hash agg operator.
> The key requirement is to build on what we have for now; we may want to
> tackle a larger project to create a more complete solution later.
> Existing functionality:
> * Two ZK-based queues called the “small” and “large” query queues.
> * A threshold, call it T, given as a query cost, to determine the queue into
> which a query will go.
> * Admit levels for the two queues: call them Qs and Ql.
> Basically, when a query comes in:
> * Plan the query as usual.
> * Obtain the final query cost from the planner, call this C.
> * If C<T, the query goes into the small queue, else it goes into the large
> queue.
> * Suppose the small queue. Ask ZK if the query can run.
> * ZK checks if Qs queries are already running. If so, the query waits, else
> the query runs.
> The proposed changes include:
> * Refactor the code to provide a queueing API that supports a variety of
> queuing mechanisms.
> * Provide three: the null queue (default), an in-process queue (for testing)
> and the ZK queues.
> * Modify the query profile web UI to show two new bits of information about
> queues:
> - The queue to which the query was sent.
> - The total planning cost.
> * Modify the query profile web UI to show two memory assignment numbers:
> - Total memory allocated to the query
> - Memory per sort or hash-add operator
> Then, add to the queue mechanism the ability to do memory assignment:
> * Provide a weight, W: every small query gets 1 unit, every large query gets
> W units.
> * Use the queue admit levels to determine total units: U = Qs + W * Ql.
> * Obtain total direct memory from the system. M.
> * Subtract a reserve percent R for overhead.
> * Do the math to get the memory per query for each query:
> * For the small queue: (M - R) / U
> * For the large queue: (M - R) / U * W
> * Use this memory amount as the “memory per query” number in the existing
> sort/hash-agg memory assignment (instead of the fixed 2 GB.)
> The result will be a nice incremental addition to what we already have, and
> should make it a bit easier people to actually use the feature (because they
> can see the planning numbers and see the queues used, allowing them to
> effectively tune the system.)
> The API used for the above features also allow third parties to add on a more
> robust admission control feature as needed, perhaps tying into an existing
> queueing mechanism of their choice.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)