[ 
https://issues.apache.org/jira/browse/DRILL-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151214#comment-16151214
 ] 

ASF GitHub Bot commented on DRILL-5716:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/928#discussion_r136668566
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
 ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.work.foreman.rm;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.xml.bind.annotation.XmlRootElement;
    +
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.coord.DistributedSemaphore;
    +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +import org.apache.drill.exec.proto.helper.QueryIdHelper;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.server.options.SystemOptionManager;
    +
    +/**
    + * Distributed query queue which uses a Zookeeper distributed semaphore to
    + * control queuing across the cluster. The distributed queue is actually 
two
    + * queues: one for "small" queries, another for "large" queries. Query 
size is
    + * determined by the Planner's estimate of query cost.
    + * <p>
    + * This queue is configured using system options:
    + * <dl>
    + * <dt><tt>exec.queue.enable</tt>
    + * <dt>
    + * <dd>Set to true to enable the distributed queue.</dd>
    + * <dt><tt>exec.queue.large</tt>
    + * <dt>
    + * <dd>The maximum number of large queries to admit. Additional
    + * queries wait in the queue.</dd>
    + * <dt><tt>exec.queue.small</tt>
    + * <dt>
    + * <dd>The maximum number of small queries to admit. Additional
    + * queries wait in the queue.</dd>
    + * <dt><tt>exec.queue.threshold</tt>
    + * <dt>
    + * <dd>The cost threshold. Queries below this size are small, at
    + * or above this size are large..</dd>
    + * <dt><tt>exec.queue.timeout_millis</tt>
    + * <dt>
    + * <dd>The maximum time (in milliseconds) a query will wait in the
    + * queue before failing.</dd>
    + * </dl>
    + * <p>
    + * The above values are refreshed every five seconds. This aids performance
    + * a bit in systems with very high query arrival rates.
    + */
    +
    +public class DistributedQueryQueue implements QueryQueue {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class);
    +
    +  private class DistributedQueueLease implements QueueLease {
    +    private final QueryId queryId;
    +    private DistributedLease lease;
    +    private final String queueName;
    +    private long queryMemory;
    +
    +    public DistributedQueueLease(QueryId queryId, String queueName, 
DistributedLease lease, long queryMemory) {
    +      this.queryId = queryId;
    +      this.queueName = queueName;
    +      this.lease = lease;
    +      this.queryMemory = queryMemory;
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return String.format("Lease for %s queue to query %s",
    +          queueName, QueryIdHelper.getQueryId(queryId));
    +    }
    +
    +    @Override
    +    public long queryMemoryPerNode() { return queryMemory; }
    +
    +    @Override
    +    public void release() {
    +      DistributedQueryQueue.this.release(this);
    +    }
    +
    +    @Override
    +    public String queueName() { return queueName; }
    +  }
    +
    +  /**
    +   * Exposes a snapshot of internal state information for use in status
    +   * reporting, such as in the UI.
    +   */
    +
    +  @XmlRootElement
    +  public static class ZKQueueInfo {
    +    public final int smallQueueSize;
    +    public final int largeQueueSize;
    +    public final double queueThreshold;
    +    public final long memoryPerNode;
    +    public final long memoryPerSmallQuery;
    +    public final long memoryPerLargeQuery;
    +
    +    public ZKQueueInfo(DistributedQueryQueue queue) {
    +      smallQueueSize = queue.smallQueueSize;
    +      largeQueueSize = queue.largeQueueSize;
    +      queueThreshold = queue.queueThreshold;
    +      memoryPerNode = queue.memoryPerNode;
    +      memoryPerSmallQuery = queue.memoryPerSmallQuery;
    +      memoryPerLargeQuery = queue.memoryPerLargeQuery;
    +    }
    +  }
    +
    +  private long memoryPerNode;
    +  private int largeQueueSize;
    +  private int smallQueueSize;
    +  private SystemOptionManager optionManager;
    +  private ClusterCoordinator clusterCoordinator;
    +  private long queueThreshold;
    +  private long queueTimeout;
    +  private long refreshTime;
    +  private long memoryPerSmallQuery;
    +  private long memoryPerLargeQuery;
    +  private double largeToSmallRatio;
    +
    +  public DistributedQueryQueue(DrillbitContext context) {
    +    optionManager = context.getOptionManager();
    +    clusterCoordinator = context.getClusterCoordinator();
    +  }
    +
    +  @Override
    +  public void setMemoryPerNode(long memoryPerNode) {
    +    this.memoryPerNode = memoryPerNode;
    +    refreshConfig();
    +  }
    +
    +  private void assignMemory() {
    +
    +    // Divide up memory between queues using admission rate
    +    // to give more memory to larger queries and less to
    +    // smaller queries. We assume that large queries are
    +    // larger than small queries by a factor of
    +    // largeToSmallRatio.
    +
    +    double totalUnits = largeToSmallRatio * largeQueueSize + 
smallQueueSize;
    +    double memoryUnit = memoryPerNode / totalUnits;
    +    memoryPerLargeQuery = Math.round(memoryUnit * largeToSmallRatio);
    +    memoryPerSmallQuery = Math.round(memoryUnit);
    +
    +    logger.debug("Distributed queue memory config: total memory = {}, 
large/small memory ratio = {}",
    +                 memoryPerNode, largeToSmallRatio);
    +    logger.debug("Small queue: {} slots, {} bytes per slot", 
smallQueueSize, memoryPerSmallQuery);
    +    logger.debug("Large queue: {} slots, {} bytes per slot", 
largeQueueSize, memoryPerLargeQuery);
    +  }
    +
    +  @Override
    +  public long getDefaultMemoryPerNode(double cost) {
    +    return (cost < queueThreshold) ? memoryPerSmallQuery : 
memoryPerLargeQuery;
    +  }
    +
    +  /**
    +   * This limits the number of "small" and "large" queries that a Drill 
cluster will run
    +   * simultaneously, if queuing is enabled. If the query is unable to run, 
this will block
    +   * until it can. Beware that this is called under run(), and so will 
consume a Thread
    --- End diff --
    
    Excellent points! This is why the ZK-based queue mechanism can't be our 
ultimate solution.
    
    A true scheduler should limit the queue length. But, note, we need to limit 
the *global* length. If the limit is 5 then the sixth query, *regardless of 
Drillbit*, should fail. But, with ZK queues, the best we can do is keep a 
per-Drillbit list of enqueued queries and have a per-node limit. This means 
that we'd use an unfair algorithm: whether your query is queued depends on 
whether you ended up on a busy Drillbit or not.
    
    The justification, I suppose, for current behavior is this. Without 
queueing, all queries run and use the same amount of memory in the Foreman 
*plus* a large amount of memory on each node. At least, here, only Foreman 
memory is used (plus a Foreman thread.)
    
    Note that the ZK-based mechanism does, in fact, have a queue timeout, so 
there is at least that safety-valve.


> Queue-based memory assignment for buffering operators
> -----------------------------------------------------
>
>                 Key: DRILL-5716
>                 URL: https://issues.apache.org/jira/browse/DRILL-5716
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.12.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>
> Apache Drill already has a queueing feature based on ZK semaphores. We did a 
> bit of testing to  show that the feature does, in fact work. We propose to 
> enhance the feature with some light revisions to make work with the "managed" 
> external sort and the newly-added spilling feature for the hash agg operator. 
> The key requirement is to build on what we have for now; we may want to 
> tackle a larger project to create a more complete solution later.
> Existing functionality:
> * Two ZK-based queues called the “small” and “large” query queues.
> * A threshold, call it T, given as a query cost, to determine the queue into 
> which a query will go.
> * Admit levels for the two queues: call them Qs and Ql.
> Basically, when a query comes in:
> * Plan the query as usual.
> * Obtain the final query cost from the planner, call this C.
> * If C<T, the query goes into the small queue, else it goes into the large 
> queue.
> * Suppose the small queue. Ask ZK if the query can run.
> * ZK checks if Qs queries are already running. If so, the query waits, else 
> the query runs.
> The proposed changes include:
> * Refactor the code to provide a queueing API that supports a variety of 
> queuing mechanisms.
> * Provide three: the null queue (default), an in-process queue (for testing) 
> and the ZK queues.
> * Modify the query profile web UI to show two new bits of information about 
> queues:
> - The queue to which the query was sent.
> - The total planning cost.
> * Modify the query profile web UI to show two memory assignment numbers:
> - Total memory allocated to the query
> - Memory per sort or hash-add operator
> Then, add to the queue mechanism the ability to do memory assignment:
> * Provide a weight, W: every small query gets 1 unit, every large query gets 
> W units.
> * Use the queue admit levels to determine total units: U = Qs + W * Ql.
> * Obtain total direct memory from the system. M.
> * Subtract a reserve percent R for overhead.
> * Do the math to get the memory per query for each query:
> * For the small queue: (M - R) / U
> * For the large queue: (M - R) / U * W
> * Use this memory amount as the “memory per query” number in the existing 
> sort/hash-agg memory assignment (instead of the fixed 2 GB.)
> The result will be a nice incremental addition to what we already have, and 
> should make it a bit easier people to actually use the feature (because they 
> can see the planning numbers and see the queues used, allowing them to 
> effectively tune the system.)
> The API used for the above features also allow third parties to add on a more 
> robust admission control feature as needed, perhaps tying into an existing 
> queueing mechanism of their choice.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to