[
https://issues.apache.org/jira/browse/DRILL-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151218#comment-16151218
]
ASF GitHub Bot commented on DRILL-5716:
---------------------------------------
Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/928#discussion_r136670926
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
+import org.apache.drill.exec.work.foreman.Foreman;
+import
org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease;
+import
org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Global resource manager that provides basic admission control (AC)
+ * via a configured queue: either the Zookeeper-based distributed queue
+ * or the in-process embedded Drillbit queue. The queue places an upper
+ * limit on the number of running queries. This limit then "slices"
+ * memory and CPU between queries: each gets the same share of resources.
+ * <p>
+ * This is a "basic" implementation. Clearly, a more advanced
implementation
+ * could look at query cost to determine whether to give a given query more
+ * or less than the "standard" share. That is left as a future exercise;
+ * in this version we just want to get the basics working.
+ */
+
+public class ThrottledResourceManager extends AbstractResourceManager {
+
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ThrottledResourceManager.class);
+
+ public static class QueuedResourceAllocator implements
QueryResourceAllocator {
+
+ protected final ThrottledResourceManager rm;
+ protected QueryContext queryContext;
+ protected PhysicalPlan plan;
+ protected QueryWorkUnit work;
+ protected double queryCost;
+
+ protected QueuedResourceAllocator(final ThrottledResourceManager rm,
QueryContext queryContext) {
+ this.rm = rm;
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public void visitAbstractPlan(PhysicalPlan plan) {
+ this.plan = plan;
+ queryCost = plan.totalCost();
+ }
+
+ @Override
+ public void visitPhysicalPlan(final QueryWorkUnit work) {
+ this.work = work;
+ planMemory();
+ }
+
+ private void planMemory() {
+ if (plan.getProperties().hasResourcePlan) {
+ logger.debug("Memory already planned.");
+ return;
+ }
+
+ // Group fragments by node.
+
+ Multimap<String,List<PhysicalOperator>> nodeMap =
buildBufferedOpMap();
+
+ // Memory must be symmetric to avoid bottlenecks in which one node
has
+ // sorts (say) with less memory than another, causing skew in data
arrival
+ // rates for downstream operators.
+
+ int width = countBufferingOperators(nodeMap);
+
+ // Then, share memory evenly across the
+ // all sort operators on that node. This handles asymmetric
distribution
+ // such as occurs if a sort appears in the root fragment (the one
with screen),
+ // which is never parallelized.
+
+ for ( String key : nodeMap.keys() ) {
+ planNodeMemory(key, nodeMap.get(key), width);
+ }
+ }
+
+ private int countBufferingOperators(
+ Multimap<String, List<PhysicalOperator>> nodeMap) {
+ int width = 0;
+ for (List<PhysicalOperator> fragSorts : nodeMap.values()) {
+ width = Math.max(width, fragSorts.size());
+ }
+ return width;
+ }
+
+ /**
+ * Given the set of sorts (from any number of fragments) on a single
node,
+ * shared the per-query memory equally across all the sorts.
+ *
+ * @param nodeAddr
+ * @param bufferedOps
+ * @param width
+ */
+
+ private void planNodeMemory(String nodeAddr,
Collection<List<PhysicalOperator>> bufferedOps, int width) {
+ int count = 0;
+ for (List<PhysicalOperator> fragOps : bufferedOps) {
+ count += fragOps.size();
+ }
+
+ // If no sorts, nothing to plan.
+
+ if (count == 0) {
+ return; }
+
+ // Divide node memory evenly among the set of sorts, in any minor
+ // fragment, on the node. This does not deal with the subtlety of one
+ // sort on top of another: the case in which the two sorts share
memory.
+
+ long nodeMemory = queryMemoryPerNode();
+ long perOpMemory = nodeMemory / width;
+ logger.debug("Query: {}, Node: {}, allocating {} bytes each for {}
buffered operator(s).",
+ QueryIdHelper.getQueryId(queryContext.getQueryId()),
+ nodeAddr,
+ perOpMemory, width);
+
+ for (List<PhysicalOperator> fragOps : bufferedOps) {
+ for (PhysicalOperator op : fragOps) {
+
+ // Limit the memory to the maximum in the plan. Doing so is
+ // likely unnecessary, and perhaps harmful, because the
pre-planned
+ // allocation is the default maximum hard-coded to 10 GB. This
means
+ // that even if 20 GB is available to the sort, it won't use more
+ // than 10GB. This is probably more of a bug than a feature.
+
+ long alloc = Math.max(perOpMemory, op.getInitialAllocation());
+ op.setMaxAllocation(alloc);
+ }
+ }
+ }
+
+ protected long queryMemoryPerNode() {
+ return rm.getDefaultMemoryPerNode(plan.totalCost());
+ }
+
+ /**
+ * Build a list of external sorts grouped by node. We start with a list
+ * of minor fragments, each with an endpoint (node). Multiple minor
fragments
+ * may appear on each node, and each minor fragment may have 0, 1 or
more
+ * sorts.
+ *
+ * @return
+ */
+
+ private Multimap<String,List<PhysicalOperator>> buildBufferedOpMap() {
+ Multimap<String,List<PhysicalOperator>> map =
ArrayListMultimap.create();
+ for (MinorFragmentDefn defn : work.getMinorFragmentDefns()) {
+ List<PhysicalOperator> bufferedOps = getBufferedOps(defn.root());
+ if (! bufferedOps.isEmpty()) {
+ map.put(defn.fragment().getAssignment().getAddress(),
bufferedOps);
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Searches a fragment operator tree to find sorts within that
fragment.
+ */
+
+ protected static class BufferedOpFinder extends
AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
+ @Override
+ public Void visitOp(PhysicalOperator op, List<PhysicalOperator>
value) throws RuntimeException {
+ if (op.isBufferedOperator()) {
+ value.add(op);
+ }
+ visitChildren(op, value);
+ return null;
+ }
+ }
+
+ /**
+ * Search an individual fragment tree to find any sort operators it
may contain.
+ * @param root
+ * @return
+ */
+
+ private List<PhysicalOperator> getBufferedOps(FragmentRoot root) {
+ List<PhysicalOperator> bufferedOps = new ArrayList<>();
+ BufferedOpFinder finder = new BufferedOpFinder();
+ root.accept(finder, bufferedOps);
+ return bufferedOps;
+ }
+ }
+
+ /**
+ * Per-query resource manager. Handles resources and optional queue
lease for
+ * a single query. As such, this is a non-shared resource: it is
associated with
+ * a foreman: a single tread at plan time, and a single event (in some
thread)
--- End diff --
Fixed.
> Queue-based memory assignment for buffering operators
> -----------------------------------------------------
>
> Key: DRILL-5716
> URL: https://issues.apache.org/jira/browse/DRILL-5716
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.12.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
>
> Apache Drill already has a queueing feature based on ZK semaphores. We did a
> bit of testing to show that the feature does, in fact work. We propose to
> enhance the feature with some light revisions to make work with the "managed"
> external sort and the newly-added spilling feature for the hash agg operator.
> The key requirement is to build on what we have for now; we may want to
> tackle a larger project to create a more complete solution later.
> Existing functionality:
> * Two ZK-based queues called the “small” and “large” query queues.
> * A threshold, call it T, given as a query cost, to determine the queue into
> which a query will go.
> * Admit levels for the two queues: call them Qs and Ql.
> Basically, when a query comes in:
> * Plan the query as usual.
> * Obtain the final query cost from the planner, call this C.
> * If C<T, the query goes into the small queue, else it goes into the large
> queue.
> * Suppose the small queue. Ask ZK if the query can run.
> * ZK checks if Qs queries are already running. If so, the query waits, else
> the query runs.
> The proposed changes include:
> * Refactor the code to provide a queueing API that supports a variety of
> queuing mechanisms.
> * Provide three: the null queue (default), an in-process queue (for testing)
> and the ZK queues.
> * Modify the query profile web UI to show two new bits of information about
> queues:
> - The queue to which the query was sent.
> - The total planning cost.
> * Modify the query profile web UI to show two memory assignment numbers:
> - Total memory allocated to the query
> - Memory per sort or hash-add operator
> Then, add to the queue mechanism the ability to do memory assignment:
> * Provide a weight, W: every small query gets 1 unit, every large query gets
> W units.
> * Use the queue admit levels to determine total units: U = Qs + W * Ql.
> * Obtain total direct memory from the system. M.
> * Subtract a reserve percent R for overhead.
> * Do the math to get the memory per query for each query:
> * For the small queue: (M - R) / U
> * For the large queue: (M - R) / U * W
> * Use this memory amount as the “memory per query” number in the existing
> sort/hash-agg memory assignment (instead of the fixed 2 GB.)
> The result will be a nice incremental addition to what we already have, and
> should make it a bit easier people to actually use the feature (because they
> can see the planning numbers and see the queues used, allowing them to
> effectively tune the system.)
> The API used for the above features also allow third parties to add on a more
> robust admission control feature as needed, perhaps tying into an existing
> queueing mechanism of their choice.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)