[ 
https://issues.apache.org/jira/browse/DRILL-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151218#comment-16151218
 ] 

ASF GitHub Bot commented on DRILL-5716:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/928#discussion_r136670926
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
 ---
    @@ -0,0 +1,305 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.work.foreman.rm;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import org.apache.drill.exec.ops.QueryContext;
    +import org.apache.drill.exec.physical.PhysicalPlan;
    +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
    +import org.apache.drill.exec.physical.base.FragmentRoot;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.proto.helper.QueryIdHelper;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.work.QueryWorkUnit;
    +import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
    +import org.apache.drill.exec.work.foreman.Foreman;
    +import 
org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
    +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease;
    +import 
org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
    +
    +import com.google.common.collect.ArrayListMultimap;
    +import com.google.common.collect.Multimap;
    +
    +/**
    + * Global resource manager that provides basic admission control (AC)
    + * via a configured queue: either the Zookeeper-based distributed queue
    + * or the in-process embedded Drillbit queue. The queue places an upper
    + * limit on the number of running queries. This limit then "slices"
    + * memory and CPU between queries: each gets the same share of resources.
    + * <p>
    + * This is a "basic" implementation. Clearly, a more advanced 
implementation
    + * could look at query cost to determine whether to give a given query more
    + * or less than the "standard" share. That is left as a future exercise;
    + * in this version we just want to get the basics working.
    + */
    +
    +public class ThrottledResourceManager extends AbstractResourceManager {
    +
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ThrottledResourceManager.class);
    +
    +  public static class QueuedResourceAllocator implements 
QueryResourceAllocator {
    +
    +    protected final ThrottledResourceManager rm;
    +    protected QueryContext queryContext;
    +    protected PhysicalPlan plan;
    +    protected QueryWorkUnit work;
    +    protected double queryCost;
    +
    +    protected QueuedResourceAllocator(final ThrottledResourceManager rm, 
QueryContext queryContext) {
    +      this.rm = rm;
    +      this.queryContext = queryContext;
    +    }
    +
    +    @Override
    +    public void visitAbstractPlan(PhysicalPlan plan) {
    +      this.plan = plan;
    +      queryCost = plan.totalCost();
    +    }
    +
    +    @Override
    +    public void visitPhysicalPlan(final QueryWorkUnit work) {
    +      this.work = work;
    +      planMemory();
    +    }
    +
    +    private void planMemory() {
    +      if (plan.getProperties().hasResourcePlan) {
    +        logger.debug("Memory already planned.");
    +        return;
    +      }
    +
    +      // Group fragments by node.
    +
    +      Multimap<String,List<PhysicalOperator>> nodeMap = 
buildBufferedOpMap();
    +
    +      // Memory must be symmetric to avoid bottlenecks in which one node 
has
    +      // sorts (say) with less memory than another, causing skew in data 
arrival
    +      // rates for downstream operators.
    +
    +      int width = countBufferingOperators(nodeMap);
    +
    +      // Then, share memory evenly across the
    +      // all sort operators on that node. This handles asymmetric 
distribution
    +      // such as occurs if a sort appears in the root fragment (the one 
with screen),
    +      // which is never parallelized.
    +
    +      for ( String key : nodeMap.keys() ) {
    +        planNodeMemory(key, nodeMap.get(key), width);
    +      }
    +    }
    +
    +    private int countBufferingOperators(
    +        Multimap<String, List<PhysicalOperator>> nodeMap) {
    +      int width = 0;
    +      for (List<PhysicalOperator> fragSorts : nodeMap.values()) {
    +        width = Math.max(width, fragSorts.size());
    +      }
    +      return width;
    +    }
    +
    +    /**
    +     * Given the set of sorts (from any number of fragments) on a single 
node,
    +     * shared the per-query memory equally across all the sorts.
    +     *
    +     * @param nodeAddr
    +     * @param bufferedOps
    +     * @param width
    +     */
    +
    +    private void planNodeMemory(String nodeAddr, 
Collection<List<PhysicalOperator>> bufferedOps, int width) {
    +      int count = 0;
    +      for (List<PhysicalOperator> fragOps : bufferedOps) {
    +        count += fragOps.size();
    +      }
    +
    +      // If no sorts, nothing to plan.
    +
    +      if (count == 0) {
    +        return; }
    +
    +      // Divide node memory evenly among the set of sorts, in any minor
    +      // fragment, on the node. This does not deal with the subtlety of one
    +      // sort on top of another: the case in which the two sorts share 
memory.
    +
    +      long nodeMemory = queryMemoryPerNode();
    +      long perOpMemory = nodeMemory / width;
    +      logger.debug("Query: {}, Node: {}, allocating {} bytes each for {} 
buffered operator(s).",
    +                   QueryIdHelper.getQueryId(queryContext.getQueryId()),
    +                   nodeAddr,
    +                   perOpMemory, width);
    +
    +      for (List<PhysicalOperator> fragOps : bufferedOps) {
    +        for (PhysicalOperator op : fragOps) {
    +
    +          // Limit the memory to the maximum in the plan. Doing so is
    +          // likely unnecessary, and perhaps harmful, because the 
pre-planned
    +          // allocation is the default maximum hard-coded to 10 GB. This 
means
    +          // that even if 20 GB is available to the sort, it won't use more
    +          // than 10GB. This is probably more of a bug than a feature.
    +
    +          long alloc = Math.max(perOpMemory, op.getInitialAllocation());
    +          op.setMaxAllocation(alloc);
    +        }
    +      }
    +    }
    +
    +    protected long queryMemoryPerNode() {
    +      return rm.getDefaultMemoryPerNode(plan.totalCost());
    +    }
    +
    +    /**
    +     * Build a list of external sorts grouped by node. We start with a list
    +     * of minor fragments, each with an endpoint (node). Multiple minor 
fragments
    +     * may appear on each node, and each minor fragment may have 0, 1 or 
more
    +     * sorts.
    +     *
    +     * @return
    +     */
    +
    +    private Multimap<String,List<PhysicalOperator>> buildBufferedOpMap() {
    +      Multimap<String,List<PhysicalOperator>> map = 
ArrayListMultimap.create();
    +      for (MinorFragmentDefn defn : work.getMinorFragmentDefns()) {
    +        List<PhysicalOperator> bufferedOps = getBufferedOps(defn.root());
    +        if (! bufferedOps.isEmpty()) {
    +          map.put(defn.fragment().getAssignment().getAddress(), 
bufferedOps);
    +        }
    +      }
    +      return map;
    +    }
    +
    +    /**
    +     * Searches a fragment operator tree to find sorts within that 
fragment.
    +     */
    +
    +    protected static class BufferedOpFinder extends 
AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
    +      @Override
    +      public Void visitOp(PhysicalOperator op, List<PhysicalOperator> 
value) throws RuntimeException {
    +        if (op.isBufferedOperator()) {
    +          value.add(op);
    +        }
    +        visitChildren(op, value);
    +        return null;
    +      }
    +    }
    +
    +    /**
    +     * Search an individual fragment tree to find any sort operators it 
may contain.
    +     * @param root
    +     * @return
    +     */
    +
    +    private List<PhysicalOperator> getBufferedOps(FragmentRoot root) {
    +      List<PhysicalOperator> bufferedOps = new ArrayList<>();
    +      BufferedOpFinder finder = new BufferedOpFinder();
    +      root.accept(finder, bufferedOps);
    +      return bufferedOps;
    +    }
    +  }
    +
    +  /**
    +   * Per-query resource manager. Handles resources and optional queue 
lease for
    +   * a single query. As such, this is a non-shared resource: it is 
associated with
    +   * a foreman: a single tread at plan time, and a single event (in some 
thread)
    --- End diff --
    
    Fixed.


> Queue-based memory assignment for buffering operators
> -----------------------------------------------------
>
>                 Key: DRILL-5716
>                 URL: https://issues.apache.org/jira/browse/DRILL-5716
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.12.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>
> Apache Drill already has a queueing feature based on ZK semaphores. We did a 
> bit of testing to  show that the feature does, in fact work. We propose to 
> enhance the feature with some light revisions to make work with the "managed" 
> external sort and the newly-added spilling feature for the hash agg operator. 
> The key requirement is to build on what we have for now; we may want to 
> tackle a larger project to create a more complete solution later.
> Existing functionality:
> * Two ZK-based queues called the “small” and “large” query queues.
> * A threshold, call it T, given as a query cost, to determine the queue into 
> which a query will go.
> * Admit levels for the two queues: call them Qs and Ql.
> Basically, when a query comes in:
> * Plan the query as usual.
> * Obtain the final query cost from the planner, call this C.
> * If C<T, the query goes into the small queue, else it goes into the large 
> queue.
> * Suppose the small queue. Ask ZK if the query can run.
> * ZK checks if Qs queries are already running. If so, the query waits, else 
> the query runs.
> The proposed changes include:
> * Refactor the code to provide a queueing API that supports a variety of 
> queuing mechanisms.
> * Provide three: the null queue (default), an in-process queue (for testing) 
> and the ZK queues.
> * Modify the query profile web UI to show two new bits of information about 
> queues:
> - The queue to which the query was sent.
> - The total planning cost.
> * Modify the query profile web UI to show two memory assignment numbers:
> - Total memory allocated to the query
> - Memory per sort or hash-add operator
> Then, add to the queue mechanism the ability to do memory assignment:
> * Provide a weight, W: every small query gets 1 unit, every large query gets 
> W units.
> * Use the queue admit levels to determine total units: U = Qs + W * Ql.
> * Obtain total direct memory from the system. M.
> * Subtract a reserve percent R for overhead.
> * Do the math to get the memory per query for each query:
> * For the small queue: (M - R) / U
> * For the large queue: (M - R) / U * W
> * Use this memory amount as the “memory per query” number in the existing 
> sort/hash-agg memory assignment (instead of the fixed 2 GB.)
> The result will be a nice incremental addition to what we already have, and 
> should make it a bit easier people to actually use the feature (because they 
> can see the planning numbers and see the queues used, allowing them to 
> effectively tune the system.)
> The API used for the above features also allow third parties to add on a more 
> robust admission control feature as needed, perhaps tying into an existing 
> queueing mechanism of their choice.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to