[
https://issues.apache.org/jira/browse/DRILL-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739620#comment-14739620
]
ASF GitHub Bot commented on DRILL-1942:
---------------------------------------
Github user sudheeshkatkam commented on a diff in the pull request:
https://github.com/apache/drill/pull/105#discussion_r39214895
--- Diff:
exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
---
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import java.io.IOException;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/*
+ * Note that the real interest here is that the drillbit doesn't become
+ * unstable from running a lot of queries concurrently -- it's not about
+ * any particular order of execution. We ignore the results.
+ */
+public class TestTpchDistributedConcurrent extends BaseTestQuery {
+ @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000);
// Longer timeout than usual.
+
+ /*
+ * Valid test names taken from TestTpchDistributed. Fuller path prefixes
are
+ * used so that tests may also be taken from other locations -- more
variety
+ * is better as far as this test goes.
+ */
+ private final static String queryFile[] = {
+ "queries/tpch/01.sql",
+ "queries/tpch/03.sql",
+ "queries/tpch/04.sql",
+ "queries/tpch/05.sql",
+ "queries/tpch/06.sql",
+ "queries/tpch/07.sql",
+ "queries/tpch/08.sql",
+ "queries/tpch/09.sql",
+ "queries/tpch/10.sql",
+ "queries/tpch/11.sql",
+ "queries/tpch/12.sql",
+ "queries/tpch/13.sql",
+ "queries/tpch/14.sql",
+ // "queries/tpch/15.sql", this creates a view
+ "queries/tpch/16.sql",
+ "queries/tpch/18.sql",
+ "queries/tpch/19_1.sql",
+ "queries/tpch/20.sql",
+ };
+
+ private final static int TOTAL_QUERIES = 115;
+ private final static int CONCURRENT_QUERIES = 15;
+
+ private final static Random random = new Random(0xdeadbeef);
+ private final static String alterSession = "alter session set
`planner.slice_target` = 10";
+
+ private int remainingQueries = TOTAL_QUERIES - CONCURRENT_QUERIES;
+ private final Semaphore completionSemaphore = new Semaphore(0);
+ private final Semaphore submissionSemaphore = new Semaphore(0);
+ private final IdentityHashMap<UserResultsListener, Object> listeners =
new IdentityHashMap<>();
+ private Thread testThread = null; // used to interrupt semaphore wait in
case of error
+
+ private static class FailedQuery {
+ final String queryFile;
+ final UserException userEx;
+
+ public FailedQuery(final String queryFile, final UserException userEx)
{
+ this.queryFile = queryFile;
+ this.userEx = userEx;
+ }
+ }
+
+ private final List<FailedQuery> failedQueries = new LinkedList<>();
+
+ private void submitRandomQuery() {
+ final String filename = queryFile[random.nextInt(queryFile.length)];
+ final String query;
+ try {
+ query = QueryTestUtil.normalizeQuery(getFile(filename)).replace(';',
' ');
+ } catch(IOException e) {
+ throw new RuntimeException("Caught exception", e);
+ }
+ final UserResultsListener listener = new ChainingSilentListener(query);
+ client.runQuery(UserBitShared.QueryType.SQL, query, listener);
+ synchronized(this) {
+ listeners.put(listener, listener);
+ }
+ }
+
+ private class ChainingSilentListener extends SilentListener {
+ private final String query;
+
+ public ChainingSilentListener(final String query) {
+ this.query = query;
+ }
+
+ @Override
+ public void queryCompleted(QueryState state) {
+ super.queryCompleted(state);
+
+ completionSemaphore.release();
+ synchronized(TestTpchDistributedConcurrent.this) {
+ final Object object = listeners.remove(this);
+ assertNotNull("listener not found", object);
+
+ /* Only submit more queries if there hasn't been an error. */
+ if ((failedQueries.size() == 0) && (remainingQueries > 0)) {
+ /*
+ * We can't directly submit the query from here, because we're
on the RPC
+ * thread, and it throws an exception if we try to send from
here. So we
+ * allow the QuerySubmitter thread to advance.
+ */
+ submissionSemaphore.release();
+ --remainingQueries;
+ }
+ }
+ }
+
+ @Override
+ public void submissionFailed(UserException uex) {
+ super.submissionFailed(uex);
+
+ completionSemaphore.release();
+ System.out.println("submissionFailed for " + query + "\nwith " +
uex);
+ synchronized(TestTpchDistributedConcurrent.this) {
+ final Object object = listeners.remove(this);
+ assertNotNull("listener not found", object);
+ failedQueries.add(new FailedQuery(query, uex));
+ testThread.interrupt();
+ }
+ }
+ }
+
+ private class QuerySubmitter extends Thread {
+ @Override
+ public void run() {
+ while(true) {
+ try {
+ submissionSemaphore.acquire();
+ } catch(InterruptedException e) {
+ System.out.println("QuerySubmitter quitting.");
+ return;
+ }
+
+ submitRandomQuery();
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentQueries() throws Exception {
+ QueryTestUtil.testRunAndPrint(client, UserBitShared.QueryType.SQL,
alterSession);
+
+ testThread = Thread.currentThread();
+ final QuerySubmitter querySubmitter = new QuerySubmitter();
+ querySubmitter.start();
+
+ // Kick off the initial queries. As they complete, they will submit
more.
+ submissionSemaphore.release(CONCURRENT_QUERIES);
+
+ // Wait for all the queries to complete.
+ try {
+ completionSemaphore.acquire(TOTAL_QUERIES);
+ } catch(InterruptedException e) {
+ fail("caught " + e);
--- End diff --
If ```testThread``` is interrupted, this fail will trigger an
```AssertionError("...")```, however ```querySubmitter``` is not interrupted,
and the ```System.err.println("...") ``` calls do not happen, right?
> Improve off-heap memory usage tracking
> --------------------------------------
>
> Key: DRILL-1942
> URL: https://issues.apache.org/jira/browse/DRILL-1942
> Project: Apache Drill
> Issue Type: Improvement
> Components: Execution - Relational Operators
> Reporter: Chris Westin
> Assignee: Chris Westin
> Fix For: 1.2.0
>
> Attachments: DRILL-1942.1.patch.txt, DRILL-1942.2.patch.txt,
> DRILL-1942.3.patch.txt
>
>
> We're using a lot more memory than we think we should. We may be leaking it,
> or not releasing it as soon as we could.
> This is a call to come up with some improved tracking so that we can get
> statistics out about exactly where we're using it, and whether or not we can
> release it earlier.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)