DRILL-5922: - The QueryContext was never closed when the Foreman finished, so it's child allocator was never closed. Now it is. - The PlanSplitter created a QueryContext temporarily to construct an RPC message but never closed it. Now the temp QueryContext is closed. - The waitForExit method was error prone. Changed it to use the standard condition variable pattern. - Fixed timeouts in graceful shutdown tests
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/efed7e30 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/efed7e30 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/efed7e30 Branch: refs/heads/master Commit: efed7e3060e8054a9223e0a2001236c4d9bcd34d Parents: 9bb9370 Author: Timothy Farkas <[email protected]> Authored: Fri Nov 10 11:13:18 2017 -0800 Committer: Parth Chandra <[email protected]> Committed: Thu Jan 11 17:06:17 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/drill/test/DrillTest.java | 2 +- .../planner/fragment/SimpleParallelizer.java | 1 - .../org/apache/drill/exec/server/Drillbit.java | 2 +- .../org/apache/drill/exec/work/WorkManager.java | 105 +++++++++------- .../apache/drill/exec/work/foreman/Foreman.java | 7 +- .../drill/exec/work/user/PlanSplitter.java | 11 +- .../apache/drill/test/TestGracefulShutdown.java | 123 ++++++++++++++----- 7 files changed, 170 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/common/src/test/java/org/apache/drill/test/DrillTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java index de26470..d949d97 100644 --- a/common/src/test/java/org/apache/drill/test/DrillTest.java +++ b/common/src/test/java/org/apache/drill/test/DrillTest.java @@ -54,7 +54,7 @@ public class DrillTest { static MemWatcher memWatcher; static String className; - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(100000); + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(100_000); @Rule public final TestLogReporter logOutcome = LOG_OUTCOME; http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index d2efcfb..1ee9ea2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -115,7 +115,6 @@ public class SimpleParallelizer implements ParallelizationParameters { * @param foremanNode The driving/foreman node for this query. (this node) * @param queryId The queryId for this query. * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query. - * @param reader Tool used to read JSON plans * @param rootFragment The root node of the PhysicalPlan that we will be parallelizing. * @param session UserSession of user who launched this query. * @param queryContextInfo Info related to the context when query has started. http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 4144da0..a734ca1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -230,7 +230,7 @@ public class Drillbit implements AutoCloseable { waitForGracePeriod(); stateManager.setState(DrillbitState.DRAINING); // wait for all the in-flight queries to finish - manager.waitToExit(this, forcefulShutdown); + manager.waitToExit(forcefulShutdown); //safe to exit registrationHandle = coord.update(registrationHandle, State.OFFLINE); stateManager.setState(DrillbitState.OFFLINE); http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 5d369de..e935819 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.drill.common.SelfCleaningRunnable; -import org.apache.drill.common.concurrent.ExtendedLatch; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.proto.BitControl.FragmentStatus; @@ -37,7 +36,6 @@ import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.control.WorkEventBus; import org.apache.drill.exec.rpc.data.DataConnectionCreator; import org.apache.drill.exec.server.BootStrapContext; -import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.work.batch.ControlMessageHandler; @@ -50,9 +48,12 @@ import org.apache.drill.exec.work.user.UserWorker; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Manages the running fragments in a Drillbit. Periodically requests run-time stats updates from fragments @@ -61,12 +62,14 @@ import java.util.concurrent.Executor; public class WorkManager implements AutoCloseable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class); + public static final int EXIT_TIMEOUT_MS = 5000; + /* * We use a {@see java.util.concurrent.ConcurrentHashMap} because it promises never to throw a * {@see java.util.ConcurrentModificationException}; we need that because the statusThread may * iterate over the map while other threads add FragmentExecutors via the {@see #WorkerBee}. */ - private final Map<FragmentHandle, FragmentExecutor> runningFragments = new ConcurrentHashMap<>(); + private final ConcurrentMap<FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap(); private final ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap(); @@ -79,8 +82,8 @@ public class WorkManager implements AutoCloseable { private final WorkEventBus workBus; private final Executor executor; private final StatusThread statusThread; - private long numOfRunningQueries; - private long numOfRunningFragments; + private final Lock isEmptyLock = new ReentrantLock(); + private final Condition isEmptyCondition = isEmptyLock.newCondition(); /** * How often the StatusThread collects statistics about running fragments. @@ -162,51 +165,67 @@ public class WorkManager implements AutoCloseable { return dContext; } - private ExtendedLatch exitLatch = null; // used to wait to exit when things are still running + public void waitToExit(final boolean forcefulShutdown) { + isEmptyLock.lock(); - /** - * Waits until it is safe to exit. Blocks until all currently running fragments have completed. - * - * <p>This is intended to be used by {@link org.apache.drill.exec.server.Drillbit#close()}.</p> - */ - public void waitToExit(Drillbit bit, boolean forcefulShutdown) { - synchronized(this) { - numOfRunningQueries = queries.size(); - numOfRunningFragments = runningFragments.size(); - if ( queries.isEmpty() && runningFragments.isEmpty()) { - return; + try { + if (forcefulShutdown) { + final long startTime = System.currentTimeMillis(); + final long endTime = startTime + EXIT_TIMEOUT_MS; + long currentTime; + + while (!areQueriesAndFragmentsEmpty() && (currentTime = System.currentTimeMillis()) < endTime) { + try { + if (!isEmptyCondition.await(endTime - currentTime, TimeUnit.MILLISECONDS)) { + break; + } + } catch (InterruptedException e) { + logger.error("Interrupted while waiting to exit"); + } + } + + if (!areQueriesAndFragmentsEmpty()) { + logger.warn("Timed out after {} millis. Shutting down before all fragments and foremen " + + "have completed.", EXIT_TIMEOUT_MS); + + for (QueryId queryId: queries.keySet()) { + logger.warn("Query {} is still running.", QueryIdHelper.getQueryId(queryId)); + } + + for (FragmentHandle fragmentHandle: runningFragments.keySet()) { + logger.warn("Fragment {} is still running.", QueryIdHelper.getQueryIdentifier(fragmentHandle)); + } + } + } else { + while (!areQueriesAndFragmentsEmpty()) { + isEmptyCondition.awaitUninterruptibly(); + } } - logger.info("Draining " + queries +" queries and "+ runningFragments+" fragments."); - exitLatch = new ExtendedLatch(); - } - // Wait uninterruptibly until all the queries and running fragments on that drillbit goes down - // to zero - if( forcefulShutdown ) { - exitLatch.awaitUninterruptibly(5000); - } else { - exitLatch.awaitUninterruptibly(); + } finally { + isEmptyLock.unlock(); } } + private boolean areQueriesAndFragmentsEmpty() { + return queries.isEmpty() && runningFragments.isEmpty(); + } + /** - * If it is safe to exit, and the exitLatch is in use, signals it so that waitToExit() will - * unblock. Logs the number of pending fragments and queries that are running on that - * drillbit to track the progress of shutdown process. + * A thread calling the {@link #waitToExit(boolean)} method is notified when a foreman is retired. */ private void indicateIfSafeToExit() { - synchronized(this) { - if (exitLatch != null) { - logger.info("Waiting for "+ queries.size() +" queries to complete before shutting down"); - logger.info("Waiting for "+ runningFragments.size() +" running fragments to complete before shutting down"); - if(runningFragments.size() > numOfRunningFragments|| queries.size() > numOfRunningQueries) { - logger.info("New Fragments or queries are added while drillbit is Shutting down"); - } - if (queries.isEmpty() && runningFragments.isEmpty()) { - // Both Queries and Running fragments are empty. - // So its safe for the drillbit to exit. - exitLatch.countDown(); - } + isEmptyLock.lock(); + try { + logger.info("Waiting for "+ queries.size() +" queries to complete before shutting down"); + logger.info("Waiting for "+ runningFragments.size() +" running fragments to complete before shutting down"); + + if (!areQueriesAndFragmentsEmpty()) { + logger.info("New Fragments or queries are added while drillbit is Shutting down"); + } else { + isEmptyCondition.signal(); } + } finally { + isEmptyLock.unlock(); } } /** @@ -256,9 +275,9 @@ public class WorkManager implements AutoCloseable { final QueryId queryId = foreman.getQueryId(); final boolean wasRemoved = queries.remove(queryId, foreman); + if (!wasRemoved) { logger.warn("Couldn't find retiring Foreman for query " + queryId); -// throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId); } indicateIfSafeToExit(); http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 391f100..276d839 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -802,12 +802,17 @@ public class Foreman implements Runnable { fragmentsRunner.getBee().retireForeman(Foreman.this); try { + queryContext.close(); + } catch (Exception e) { + logger.error("Unable to close query context for query {}", QueryIdHelper.getQueryId(queryId), e); + } + + try { queryManager.close(); } catch (final Exception e) { logger.warn("unable to close query manager", e); } - queryStateProcessor.close(); try { http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java index 29b3580..f291779 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments; import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.util.Pointer; @@ -64,7 +65,7 @@ public class PlanSplitter { public QueryPlanFragments planFragments(DrillbitContext dContext, QueryId queryId, GetQueryPlanFragments req, UserClientConnection connection) { QueryPlanFragments.Builder responseBuilder = QueryPlanFragments.newBuilder(); - QueryContext queryContext = new QueryContext(connection.getSession(), dContext, queryId); + final QueryContext queryContext = new QueryContext(connection.getSession(), dContext, queryId); responseBuilder.setQueryId(queryId); @@ -79,6 +80,14 @@ public class PlanSplitter { responseBuilder.setStatus(QueryState.FAILED); responseBuilder.setError(error); } + + try { + queryContext.close(); + } catch (Exception e) { + logger.error("Error closing QueryContext when getting plan fragments for query {}.", + QueryIdHelper.getQueryId(queryId), e); + } + return responseBuilder.build(); } http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java index c26be22..a1d4eca 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java @@ -21,10 +21,13 @@ import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.work.WorkManager; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestRule; import java.io.FileWriter; import java.io.IOException; @@ -33,25 +36,26 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.Collection; import java.util.Properties; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.fail; import java.nio.file.Path; import java.io.BufferedWriter; - +import static org.junit.Assert.fail; @Category({SlowTest.class}) -public class TestGracefulShutdown extends BaseTestQuery{ +public class TestGracefulShutdown extends BaseTestQuery { + + @Rule + public final TestRule TIMEOUT = TestTools.getTimeoutRule(180_000); + + public static final int WAIT_TIMEOUT_MS = WorkManager.EXIT_TIMEOUT_MS + 30_000; @BeforeClass public static void setUpTestData() throws Exception { - for( int i = 0; i < 300; i++) { setupFile(i); } } - public static final Properties WEBSERVER_CONFIGURATION = new Properties() { { put(ExecConstants.HTTP_ENABLE, true); @@ -83,8 +87,6 @@ public class TestGracefulShutdown extends BaseTestQuery{ return builder; } - - /* Start multiple drillbits and then shutdown a drillbit. Query the online endpoints and check if the drillbit still exists. @@ -96,12 +98,12 @@ public class TestGracefulShutdown extends BaseTestQuery{ ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withBits(drillbits).withLocalZk(); enableDrillPortHunting(builder); - try ( ClusterFixture cluster = builder.build(); - ClientFixture client = cluster.clientFixture()) { + try ( ClusterFixture cluster = builder.build()) { Drillbit drillbit = cluster.drillbit("db2"); DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint(); int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); + new Thread(new Runnable() { public void run() { try { @@ -111,12 +113,28 @@ public class TestGracefulShutdown extends BaseTestQuery{ } } }).start(); - //wait for graceperiod + Thread.sleep(grace_period); - Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext() - .getClusterCoordinator() - .getOnlineEndPoints(); - Assert.assertFalse(drillbitEndpoints.contains(drillbitEndpoint)); + + long currentTime = System.currentTimeMillis(); + long stopTime = currentTime + WAIT_TIMEOUT_MS; + + while (currentTime < stopTime) { + Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit() + .getContext() + .getClusterCoordinator() + .getOnlineEndPoints(); + + if (!drillbitEndpoints.contains(drillbitEndpoint)) { + // Success + return; + } + + Thread.sleep(100L); + currentTime = System.currentTimeMillis(); + } + + Assert.fail("Timed out"); } } /* @@ -130,8 +148,7 @@ public class TestGracefulShutdown extends BaseTestQuery{ ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withBits(drillbits).withLocalZk(); enableDrillPortHunting(builder); - try ( ClusterFixture cluster = builder.build(); - ClientFixture client = cluster.clientFixture()) { + try (ClusterFixture cluster = builder.build()) { Drillbit drillbit = cluster.drillbit("db2"); int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint(); @@ -144,15 +161,31 @@ public class TestGracefulShutdown extends BaseTestQuery{ } } }).start(); + Thread.sleep(grace_period); - Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext() - .getClusterCoordinator() - .getAvailableEndpoints(); - for (DrillbitEndpoint dbEndpoint : drillbitEndpoints) { - if(drillbitEndpoint.getAddress().equals(dbEndpoint.getAddress()) && drillbitEndpoint.getUserPort() == dbEndpoint.getUserPort()) { - assertNotEquals(dbEndpoint.getState(),DrillbitEndpoint.State.ONLINE); + + long currentTime = System.currentTimeMillis(); + long stopTime = currentTime + WAIT_TIMEOUT_MS; + + while (currentTime < stopTime) { + Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit() + .getContext() + .getClusterCoordinator() + .getAvailableEndpoints(); + for (DrillbitEndpoint dbEndpoint : drillbitEndpoints) { + if (drillbitEndpoint.getAddress().equals(dbEndpoint.getAddress()) && drillbitEndpoint.getUserPort() == dbEndpoint.getUserPort()) { + if (!dbEndpoint.getState().equals(DrillbitEndpoint.State.ONLINE)) { + // Success + return; + } + } } + + Thread.sleep(100L); + currentTime = System.currentTimeMillis(); } + + Assert.fail("Timed out"); } } @@ -185,12 +218,23 @@ public class TestGracefulShutdown extends BaseTestQuery{ port++; } Thread.sleep(grace_period); - Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext() + Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit() + .getContext() .getClusterCoordinator() .getOnlineEndPoints(); - while(!listener.isDone()) { - Thread.sleep(10); + + long currentTime = System.currentTimeMillis(); + long stopTime = currentTime + WAIT_TIMEOUT_MS; + + while (currentTime < stopTime) { + if (listener.isDone()) { + break; + } + + Thread.sleep(100L); + currentTime = System.currentTimeMillis(); } + Assert.assertTrue(listener.isDone()); Assert.assertEquals(1,drillbitEndpoints.size()); } @@ -214,6 +258,7 @@ public class TestGracefulShutdown extends BaseTestQuery{ int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); listener = client.queryBuilder().sql(sql).futureSummary(); Thread.sleep(10000); + while( port < 8048) { URL url = new URL("http://localhost:"+port+"/shutdown"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); @@ -224,15 +269,27 @@ public class TestGracefulShutdown extends BaseTestQuery{ } port++; } + Thread.sleep(grace_period); - Thread.sleep(5000); - Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext() - .getClusterCoordinator().getAvailableEndpoints(); - while(!listener.isDone()) { - Thread.sleep(10); + + Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit() + .getContext() + .getClusterCoordinator() + .getAvailableEndpoints(); + + long currentTime = System.currentTimeMillis(); + long stopTime = currentTime + WAIT_TIMEOUT_MS; + + while (currentTime < stopTime) { + if (listener.isDone() && drillbitEndpoints.size() == 2) { + return; + } + + Thread.sleep(100L); + currentTime = System.currentTimeMillis(); } - Assert.assertTrue(listener.isDone()); - Assert.assertEquals(2,drillbitEndpoints.size()); + + Assert.fail("Timed out"); } }
