[ https://issues.apache.org/jira/browse/DRILL-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195470#comment-16195470 ]
ASF GitHub Bot commented on DRILL-5716: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r143315595 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; + +/** + * Distributed query queue which uses a Zookeeper distributed semaphore to + * control queuing across the cluster. The distributed queue is actually two + * queues: one for "small" queries, another for "large" queries. Query size is + * determined by the Planner's estimate of query cost. + * <p> + * This queue is configured using system options: + * <dl> + * <dt><tt>exec.queue.enable</tt> + * <dt> + * <dd>Set to true to enable the distributed queue.</dd> + * <dt><tt>exec.queue.large</tt> + * <dt> + * <dd>The maximum number of large queries to admit. Additional + * queries wait in the queue.</dd> + * <dt><tt>exec.queue.small</tt> + * <dt> + * <dd>The maximum number of small queries to admit. Additional + * queries wait in the queue.</dd> + * <dt><tt>exec.queue.threshold</tt> + * <dt> + * <dd>The cost threshold. Queries below this size are small, at + * or above this size are large..</dd> + * <dt><tt>exec.queue.timeout_millis</tt> + * <dt> + * <dd>The maximum time (in milliseconds) a query will wait in the + * queue before failing.</dd> + * </dl> + * <p> + * The above values are refreshed every five seconds. This aids performance + * a bit in systems with very high query arrival rates. + */ + +public class DistributedQueryQueue implements QueryQueue { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class); + + private class DistributedQueueLease implements QueueLease { + private final QueryId queryId; + private DistributedLease lease; + private final String queueName; + + /** + * Memory allocated to the query. Though all queries in the queue use + * the same memory allocation rules, those rules can change at any time + * as the user changes system options. This value captures the value + * calculated at the time that this lease was granted. + */ + private long queryMemory; + + public DistributedQueueLease(QueryId queryId, String queueName, + DistributedLease lease, long queryMemory) { + this.queryId = queryId; + this.queueName = queueName; + this.lease = lease; + this.queryMemory = queryMemory; + } + + @Override + public String toString() { + return String.format("Lease for %s queue to query %s", + queueName, QueryIdHelper.getQueryId(queryId)); + } + + @Override + public long queryMemoryPerNode() { return queryMemory; } + + @Override + public void release() { + DistributedQueryQueue.this.release(this); + } + + @Override + public String queueName() { return queueName; } + } + + /** + * Exposes a snapshot of internal state information for use in status + * reporting, such as in the UI. + */ + + @XmlRootElement + public static class ZKQueueInfo { + public final int smallQueueSize; + public final int largeQueueSize; + public final double queueThreshold; + public final long memoryPerNode; + public final long memoryPerSmallQuery; + public final long memoryPerLargeQuery; + + public ZKQueueInfo(DistributedQueryQueue queue) { + smallQueueSize = queue.configSet.smallQueueSize; + largeQueueSize = queue.configSet.largeQueueSize; + queueThreshold = queue.configSet.queueThreshold; + memoryPerNode = queue.memoryPerNode; + memoryPerSmallQuery = queue.memoryPerSmallQuery; + memoryPerLargeQuery = queue.memoryPerLargeQuery; + } + } + + public interface StatusAdapter { + boolean inShutDown(); + } + + /** + * Holds runtime configuration options. Allows polling the options + * for changes, and easily detecting changes. + */ + + private static class ConfigSet { + private final long queueThreshold; + private final long queueTimeout; + private final int largeQueueSize; + private final int smallQueueSize; + private final double largeToSmallRatio; + private final double reserveMemoryRatio; + private final long minimumOperatorMemory; + + public ConfigSet(SystemOptionManager optionManager) { + queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE); + queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT); + largeQueueSize = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE); + smallQueueSize = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE); + largeToSmallRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO); + reserveMemoryRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE); + minimumOperatorMemory = optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP); + } + + @Override + public boolean equals(Object other) { --- End diff -- Looks like Drill convention is that `equals()` is to be used only for collections (where `hashCode()` is also needed.) To avoid the misuse of `equals()` in this use case, I renamed the method to `isSameAs()` with the argument being of the same type. I understand the issue, "equality" (or "sameness") is a deep philosophical discussion. (Remember the Heraclitus quote: "No person ever steps in the same river twice, for it's not the same river and he/she is not the same person.") Here, I just want to avoid keeping two copies of a zillion variables. It is just a convenience to have two instances of a class instead... > Queue-based memory assignment for buffering operators > ----------------------------------------------------- > > Key: DRILL-5716 > URL: https://issues.apache.org/jira/browse/DRILL-5716 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.11.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Labels: doc-impacting > Fix For: 1.12.0 > > > Apache Drill already has a queueing feature based on ZK semaphores. We did a > bit of testing to show that the feature does, in fact work. We propose to > enhance the feature with some light revisions to make work with the "managed" > external sort and the newly-added spilling feature for the hash agg operator. > The key requirement is to build on what we have for now; we may want to > tackle a larger project to create a more complete solution later. > Existing functionality: > * Two ZK-based queues called the “small” and “large” query queues. > * A threshold, call it T, given as a query cost, to determine the queue into > which a query will go. > * Admit levels for the two queues: call them Qs and Ql. > Basically, when a query comes in: > * Plan the query as usual. > * Obtain the final query cost from the planner, call this C. > * If C<T, the query goes into the small queue, else it goes into the large > queue. > * Suppose the small queue. Ask ZK if the query can run. > * ZK checks if Qs queries are already running. If so, the query waits, else > the query runs. > The proposed changes include: > * Refactor the code to provide a queueing API that supports a variety of > queuing mechanisms. > * Provide three: the null queue (default), an in-process queue (for testing) > and the ZK queues. > * Modify the query profile web UI to show two new bits of information about > queues: > - The queue to which the query was sent. > - The total planning cost. > * Modify the query profile web UI to show two memory assignment numbers: > - Total memory allocated to the query > - Memory per sort or hash-add operator > Then, add to the queue mechanism the ability to do memory assignment: > * Provide a weight, W: every small query gets 1 unit, every large query gets > W units. > * Use the queue admit levels to determine total units: U = Qs + W * Ql. > * Obtain total direct memory from the system. M. > * Subtract a reserve percent R for overhead. > * Do the math to get the memory per query for each query: > * For the small queue: (M - R) / U > * For the large queue: (M - R) / U * W > * Use this memory amount as the “memory per query” number in the existing > sort/hash-agg memory assignment (instead of the fixed 2 GB.) > The result will be a nice incremental addition to what we already have, and > should make it a bit easier people to actually use the feature (because they > can see the planning numbers and see the queues used, allowing them to > effectively tune the system.) > The API used for the above features also allow third parties to add on a more > robust admission control feature as needed, perhaps tying into an existing > queueing mechanism of their choice. -- This message was sent by Atlassian JIRA (v6.4.14#64029)