[
https://issues.apache.org/jira/browse/DRILL-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16193044#comment-16193044
]
ASF GitHub Bot commented on DRILL-5716:
---------------------------------------
Github user arina-ielchiieva commented on a diff in the pull request:
https://github.com/apache/drill/pull/928#discussion_r142948996
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Query queue to be used in an embedded Drillbit. This queue has scope of
only
+ * the one Drillbit (not even multiple Drillbits in the same process.)
Primarily
+ * intended for testing, but may possibly be useful for other embedded
+ * applications.
+ * <p>
+ * Configuration is via config parameters (not via system options as for
the
+ * distributed queue.)
+ * <dl>
+ * <dt><tt>drill.queue.embedded.enabled</tt></dt>
+ * <dd>Set to true to enable the embedded queue. But, this setting has
effect
+ * only if the Drillbit is, in fact, embedded.</dd>
+ * <dt><tt>drill.queue.embedded.size</tt></dt>
+ * <dd>The number of active queries, all others queue. There is no upper
limit
+ * on the number of queued entries.</dt>
+ * <dt><tt>drill.queue.embedded.timeout_ms</tt></dt>
+ * <dd>The maximum time a query will wait in the queue before failing.</dd>
+ * </dl>
+ */
+
+public class EmbeddedQueryQueue implements QueryQueue {
+
+ public static String EMBEDDED_QUEUE = "drill.exec.queue.embedded";
+ public static String ENABLED = EMBEDDED_QUEUE + ".enable";
+ public static String QUEUE_SIZE = EMBEDDED_QUEUE + ".size";
+ public static String TIMEOUT_MS = EMBEDDED_QUEUE + ".timeout_ms";
+
+ public class EmbeddedQueueLease implements QueueLease {
+
+ private final QueryId queryId;
+ private boolean released;
+ private long queryMemory;
+
+ public EmbeddedQueueLease(QueryId queryId, long queryMemory) {
+ this.queryId = queryId;
+ this.queryMemory = queryMemory;
+ }
+
+ @Override
+ public String toString( ) {
+ String msg = "Embedded queue lease for " +
+ QueryIdHelper.getQueryId(queryId);
+ if (released) {
+ msg += " (released)";
+ }
+ return msg;
+ }
+
+ @Override
+ public long queryMemoryPerNode() {
+ return queryMemory;
+ }
+
+ @Override
+ public void release() {
+ EmbeddedQueryQueue.this.release(this);
+ }
+
+ @Override
+ public String queueName() { return "local-queue"; }
+ }
+
+ private final int queueTimeoutMs;
+ private final int queueSize;
+ private final Semaphore semaphore;
+ private long memoryPerQuery;
+ private final long minimumOperatorMemory;
+
+ public EmbeddedQueryQueue(DrillbitContext context) {
+ DrillConfig config = context.getConfig();
+ queueTimeoutMs = config.getInt(TIMEOUT_MS);
+ queueSize = config.getInt(QUEUE_SIZE);
+ semaphore = new Semaphore(queueSize, true);
+ minimumOperatorMemory = context.getOptionManager()
+ .getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
+ }
+
+ @Override
+ public boolean enabled() { return true; }
+
+ @Override
+ public void setMemoryPerNode(long memoryPerNode) {
+ memoryPerQuery = memoryPerNode / queueSize;
+ }
+
+ @Override
+ public long defaultQueryMemoryPerNode(double cost) {
+ return memoryPerQuery;
+ }
+
+ @Override
+ public QueueLease enqueue(QueryId queryId, double cost)
+ throws QueueTimeoutException, QueryQueueException {
+ try {
+ if (! semaphore.tryAcquire(queueTimeoutMs, TimeUnit.MILLISECONDS) ) {
+ throw new QueueTimeoutException(queryId, "embedded",
queueTimeoutMs);
+ }
+ } catch (InterruptedException e) {
+ throw new QueryQueueException("Interrupted", e);
+ }
+ return new EmbeddedQueueLease(queryId, memoryPerQuery);
+ }
+
+ private void release(QueueLease lease) {
--- End diff --
This method can accept `EmbeddedQueueLease` only to avoid casting.
> Queue-based memory assignment for buffering operators
> -----------------------------------------------------
>
> Key: DRILL-5716
> URL: https://issues.apache.org/jira/browse/DRILL-5716
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.11.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Apache Drill already has a queueing feature based on ZK semaphores. We did a
> bit of testing to show that the feature does, in fact work. We propose to
> enhance the feature with some light revisions to make work with the "managed"
> external sort and the newly-added spilling feature for the hash agg operator.
> The key requirement is to build on what we have for now; we may want to
> tackle a larger project to create a more complete solution later.
> Existing functionality:
> * Two ZK-based queues called the “small” and “large” query queues.
> * A threshold, call it T, given as a query cost, to determine the queue into
> which a query will go.
> * Admit levels for the two queues: call them Qs and Ql.
> Basically, when a query comes in:
> * Plan the query as usual.
> * Obtain the final query cost from the planner, call this C.
> * If C<T, the query goes into the small queue, else it goes into the large
> queue.
> * Suppose the small queue. Ask ZK if the query can run.
> * ZK checks if Qs queries are already running. If so, the query waits, else
> the query runs.
> The proposed changes include:
> * Refactor the code to provide a queueing API that supports a variety of
> queuing mechanisms.
> * Provide three: the null queue (default), an in-process queue (for testing)
> and the ZK queues.
> * Modify the query profile web UI to show two new bits of information about
> queues:
> - The queue to which the query was sent.
> - The total planning cost.
> * Modify the query profile web UI to show two memory assignment numbers:
> - Total memory allocated to the query
> - Memory per sort or hash-add operator
> Then, add to the queue mechanism the ability to do memory assignment:
> * Provide a weight, W: every small query gets 1 unit, every large query gets
> W units.
> * Use the queue admit levels to determine total units: U = Qs + W * Ql.
> * Obtain total direct memory from the system. M.
> * Subtract a reserve percent R for overhead.
> * Do the math to get the memory per query for each query:
> * For the small queue: (M - R) / U
> * For the large queue: (M - R) / U * W
> * Use this memory amount as the “memory per query” number in the existing
> sort/hash-agg memory assignment (instead of the fixed 2 GB.)
> The result will be a nice incremental addition to what we already have, and
> should make it a bit easier people to actually use the feature (because they
> can see the planning numbers and see the queues used, allowing them to
> effectively tune the system.)
> The API used for the above features also allow third parties to add on a more
> robust admission control feature as needed, perhaps tying into an existing
> queueing mechanism of their choice.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)