[ 
https://issues.apache.org/jira/browse/DRILL-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151215#comment-16151215
 ] 

ASF GitHub Bot commented on DRILL-5716:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/928#discussion_r136670348
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java
 ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.work.foreman.rm;
    +
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +
    +/**
    + * Interface which defines a queue implementation for query queues.
    + * Implementations can queue locally, queue distributed, or do
    + * nothing at all.
    + * <p>
    + * A queue can report itself as enabled or disabled. When enabled,
    + * all queries must obtain a lease prior to starting execution. The
    + * lease must be released at the completion of execution.
    + */
    +
    +public interface QueryQueue {
    +
    +  /**
    +   * The opaque lease returned once a query is admitted
    +   * for execution.
    +   */
    +
    +  public interface QueueLease {
    +    long queryMemoryPerNode();
    +
    +    /**
    +     * Release a query lease obtained from {@link #queue(QueryId, 
double))}.
    +     * Should be called by the per-query resource manager.
    +     *
    +     * @param lease the lease to be released.
    +     */
    +
    +    void release();
    +
    +    String queueName();
    +  };
    +
    +  /**
    +   * Exception thrown if a query exceeds the configured wait time
    +   * in the query queue.
    +   */
    +
    +  @SuppressWarnings("serial")
    +  public class QueueTimeoutException extends Exception {
    +
    +    private QueryId queryId;
    +    private String queueName;
    +    private int timeoutMs;
    +
    +    public QueueTimeoutException(QueryId queryId, String queueName, int 
timeoutMs) {
    +      super( String.format(
    +          "Query timed out of the %s queue after %d ms.",
    +          queueName, timeoutMs ));
    +      this.queryId = queryId;
    +      this.queueName = queueName;
    +      this.timeoutMs = timeoutMs;
    +    }
    +
    +    public QueryId queryId() { return queryId; }
    +    public String queueName() { return queueName; }
    +    public int timeoutMs() { return timeoutMs; }
    +  }
    +
    +  /**
    +   * Exception thrown for all non-timeout error conditions.
    +   */
    +
    +  @SuppressWarnings("serial")
    +  public class QueryQueueException extends Exception {
    +    QueryQueueException(String msg, Exception e) {
    +      super(msg, e);
    +    }
    +  }
    +
    +  void setMemoryPerNode(long memoryPerNode);
    +
    +  /**
    +   * Return the amount of memory per node when creating a EXPLAIN
    +   * query plan. Plans to be executed should get the query memory from
    +   * the lease, as the lease may adjust the default amount on a per-query
    +   * basis. This means that the memory used to execute the query may
    +   * differ from the amount shown in an EXPLAIN plan.
    +   *
    +   * @return assumed memory per node, in bytes, to use when creating
    +   * an EXPLAIN plan
    +   */
    +
    +  long getDefaultMemoryPerNode(double cost);
    +
    +  /**
    +   * Determine if the queue is enabled.
    +   * @return true if the query is enabled, false otherwise.
    +   */
    +
    +  boolean enabled();
    +
    +  /**
    +   * Queue a query. The method returns only when the query is admitted for
    +   * execution. As a result, the calling thread may block up to the 
configured
    --- End diff --
    
    See note above about the limitations of the ZK-based distributed queue.
    
    As noted, we can implement a limit when using the in-process, per-Foreman 
queue, but that only works for the embedded or single-server case.


> Queue-based memory assignment for buffering operators
> -----------------------------------------------------
>
>                 Key: DRILL-5716
>                 URL: https://issues.apache.org/jira/browse/DRILL-5716
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.12.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>
> Apache Drill already has a queueing feature based on ZK semaphores. We did a 
> bit of testing to  show that the feature does, in fact work. We propose to 
> enhance the feature with some light revisions to make work with the "managed" 
> external sort and the newly-added spilling feature for the hash agg operator. 
> The key requirement is to build on what we have for now; we may want to 
> tackle a larger project to create a more complete solution later.
> Existing functionality:
> * Two ZK-based queues called the “small” and “large” query queues.
> * A threshold, call it T, given as a query cost, to determine the queue into 
> which a query will go.
> * Admit levels for the two queues: call them Qs and Ql.
> Basically, when a query comes in:
> * Plan the query as usual.
> * Obtain the final query cost from the planner, call this C.
> * If C<T, the query goes into the small queue, else it goes into the large 
> queue.
> * Suppose the small queue. Ask ZK if the query can run.
> * ZK checks if Qs queries are already running. If so, the query waits, else 
> the query runs.
> The proposed changes include:
> * Refactor the code to provide a queueing API that supports a variety of 
> queuing mechanisms.
> * Provide three: the null queue (default), an in-process queue (for testing) 
> and the ZK queues.
> * Modify the query profile web UI to show two new bits of information about 
> queues:
> - The queue to which the query was sent.
> - The total planning cost.
> * Modify the query profile web UI to show two memory assignment numbers:
> - Total memory allocated to the query
> - Memory per sort or hash-add operator
> Then, add to the queue mechanism the ability to do memory assignment:
> * Provide a weight, W: every small query gets 1 unit, every large query gets 
> W units.
> * Use the queue admit levels to determine total units: U = Qs + W * Ql.
> * Obtain total direct memory from the system. M.
> * Subtract a reserve percent R for overhead.
> * Do the math to get the memory per query for each query:
> * For the small queue: (M - R) / U
> * For the large queue: (M - R) / U * W
> * Use this memory amount as the “memory per query” number in the existing 
> sort/hash-agg memory assignment (instead of the fixed 2 GB.)
> The result will be a nice incremental addition to what we already have, and 
> should make it a bit easier people to actually use the feature (because they 
> can see the planning numbers and see the queues used, allowing them to 
> effectively tune the system.)
> The API used for the above features also allow third parties to add on a more 
> robust admission control feature as needed, perhaps tying into an existing 
> queueing mechanism of their choice.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to