DRILL-5922: - The QueryContext was never closed when the Foreman finished, so 
it's child allocator was never closed. Now it is. - The PlanSplitter created a 
QueryContext temporarily to construct an RPC message but never closed it. Now 
the temp QueryContext is closed. - The waitForExit method was error prone. 
Changed it to use the standard condition variable pattern. - Fixed timeouts in 
graceful shutdown tests


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/efed7e30
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/efed7e30
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/efed7e30

Branch: refs/heads/master
Commit: efed7e3060e8054a9223e0a2001236c4d9bcd34d
Parents: 9bb9370
Author: Timothy Farkas <[email protected]>
Authored: Fri Nov 10 11:13:18 2017 -0800
Committer: Parth Chandra <[email protected]>
Committed: Thu Jan 11 17:06:17 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/drill/test/DrillTest.java   |   2 +-
 .../planner/fragment/SimpleParallelizer.java    |   1 -
 .../org/apache/drill/exec/server/Drillbit.java  |   2 +-
 .../org/apache/drill/exec/work/WorkManager.java | 105 +++++++++-------
 .../apache/drill/exec/work/foreman/Foreman.java |   7 +-
 .../drill/exec/work/user/PlanSplitter.java      |  11 +-
 .../apache/drill/test/TestGracefulShutdown.java | 123 ++++++++++++++-----
 7 files changed, 170 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/common/src/test/java/org/apache/drill/test/DrillTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java 
b/common/src/test/java/org/apache/drill/test/DrillTest.java
index de26470..d949d97 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -54,7 +54,7 @@ public class DrillTest {
   static MemWatcher memWatcher;
   static String className;
 
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(100000);
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(100_000);
 
   @Rule public final TestLogReporter logOutcome = LOG_OUTCOME;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index d2efcfb..1ee9ea2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -115,7 +115,6 @@ public class SimpleParallelizer implements 
ParallelizationParameters {
    * @param foremanNode     The driving/foreman node for this query.  (this 
node)
    * @param queryId         The queryId for this query.
    * @param activeEndpoints The list of endpoints to consider for inclusion in 
planning this query.
-   * @param reader          Tool used to read JSON plans
    * @param rootFragment    The root node of the PhysicalPlan that we will be 
parallelizing.
    * @param session         UserSession of user who launched this query.
    * @param queryContextInfo Info related to the context when query has 
started.

http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 4144da0..a734ca1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -230,7 +230,7 @@ public class Drillbit implements AutoCloseable {
     waitForGracePeriod();
     stateManager.setState(DrillbitState.DRAINING);
     // wait for all the in-flight queries to finish
-    manager.waitToExit(this, forcefulShutdown);
+    manager.waitToExit(forcefulShutdown);
     //safe to exit
     registrationHandle = coord.update(registrationHandle, State.OFFLINE);
     stateManager.setState(DrillbitState.OFFLINE);

http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 5d369de..e935819 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.drill.common.SelfCleaningRunnable;
-import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -37,7 +36,6 @@ import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
 import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
@@ -50,9 +48,12 @@ import org.apache.drill.exec.work.user.UserWorker;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Manages the running fragments in a Drillbit. Periodically requests run-time 
stats updates from fragments
@@ -61,12 +62,14 @@ import java.util.concurrent.Executor;
 public class WorkManager implements AutoCloseable {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(WorkManager.class);
 
+  public static final int EXIT_TIMEOUT_MS = 5000;
+
   /*
    * We use a {@see java.util.concurrent.ConcurrentHashMap} because it 
promises never to throw a
    * {@see java.util.ConcurrentModificationException}; we need that because 
the statusThread may
    * iterate over the map while other threads add FragmentExecutors via the 
{@see #WorkerBee}.
    */
-  private final Map<FragmentHandle, FragmentExecutor> runningFragments = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<FragmentHandle, FragmentExecutor> 
runningFragments = Maps.newConcurrentMap();
 
   private final ConcurrentMap<QueryId, Foreman> queries = 
Maps.newConcurrentMap();
 
@@ -79,8 +82,8 @@ public class WorkManager implements AutoCloseable {
   private final WorkEventBus workBus;
   private final Executor executor;
   private final StatusThread statusThread;
-  private long numOfRunningQueries;
-  private long numOfRunningFragments;
+  private final Lock isEmptyLock = new ReentrantLock();
+  private final Condition isEmptyCondition = isEmptyLock.newCondition();
 
   /**
    * How often the StatusThread collects statistics about running fragments.
@@ -162,51 +165,67 @@ public class WorkManager implements AutoCloseable {
     return dContext;
   }
 
-  private ExtendedLatch exitLatch = null; // used to wait to exit when things 
are still running
+  public void waitToExit(final boolean forcefulShutdown) {
+    isEmptyLock.lock();
 
-  /**
-   * Waits until it is safe to exit. Blocks until all currently running 
fragments have completed.
-   *
-   * <p>This is intended to be used by {@link 
org.apache.drill.exec.server.Drillbit#close()}.</p>
-   */
-  public void waitToExit(Drillbit bit, boolean forcefulShutdown) {
-    synchronized(this) {
-      numOfRunningQueries = queries.size();
-      numOfRunningFragments = runningFragments.size();
-      if ( queries.isEmpty() && runningFragments.isEmpty()) {
-        return;
+    try {
+      if (forcefulShutdown) {
+        final long startTime = System.currentTimeMillis();
+        final long endTime = startTime + EXIT_TIMEOUT_MS;
+        long currentTime;
+
+        while (!areQueriesAndFragmentsEmpty() && (currentTime = 
System.currentTimeMillis()) < endTime) {
+          try {
+            if (!isEmptyCondition.await(endTime - currentTime, 
TimeUnit.MILLISECONDS)) {
+              break;
+            }
+          } catch (InterruptedException e) {
+            logger.error("Interrupted while waiting to exit");
+          }
+        }
+
+        if (!areQueriesAndFragmentsEmpty()) {
+          logger.warn("Timed out after {} millis. Shutting down before all 
fragments and foremen " +
+            "have completed.", EXIT_TIMEOUT_MS);
+
+          for (QueryId queryId: queries.keySet()) {
+            logger.warn("Query {} is still running.", 
QueryIdHelper.getQueryId(queryId));
+          }
+
+          for (FragmentHandle fragmentHandle: runningFragments.keySet()) {
+            logger.warn("Fragment {} is still running.", 
QueryIdHelper.getQueryIdentifier(fragmentHandle));
+          }
+        }
+      } else {
+        while (!areQueriesAndFragmentsEmpty()) {
+          isEmptyCondition.awaitUninterruptibly();
+        }
       }
-      logger.info("Draining " + queries +" queries and "+ runningFragments+" 
fragments.");
-      exitLatch = new ExtendedLatch();
-    }
-    // Wait uninterruptibly until all the queries and running fragments on 
that drillbit goes down
-    // to zero
-    if( forcefulShutdown ) {
-      exitLatch.awaitUninterruptibly(5000);
-    } else {
-      exitLatch.awaitUninterruptibly();
+    } finally {
+      isEmptyLock.unlock();
     }
   }
 
+  private boolean areQueriesAndFragmentsEmpty() {
+    return queries.isEmpty() && runningFragments.isEmpty();
+  }
+
   /**
-   * If it is safe to exit, and the exitLatch is in use, signals it so that 
waitToExit() will
-   * unblock. Logs the number of pending fragments and queries that are 
running on that
-   * drillbit to track the progress of shutdown process.
+   * A thread calling the {@link #waitToExit(boolean)} method is notified when 
a foreman is retired.
    */
   private void indicateIfSafeToExit() {
-    synchronized(this) {
-      if (exitLatch != null) {
-        logger.info("Waiting for "+ queries.size() +" queries to complete 
before shutting down");
-        logger.info("Waiting for "+ runningFragments.size() +" running 
fragments to complete before shutting down");
-        if(runningFragments.size() > numOfRunningFragments|| queries.size() > 
numOfRunningQueries) {
-          logger.info("New Fragments or queries are added while drillbit is 
Shutting down");
-        }
-        if (queries.isEmpty() && runningFragments.isEmpty()) {
-          // Both Queries and Running fragments are empty.
-          // So its safe for the drillbit to exit.
-          exitLatch.countDown();
-        }
+    isEmptyLock.lock();
+    try {
+      logger.info("Waiting for "+ queries.size() +" queries to complete before 
shutting down");
+      logger.info("Waiting for "+ runningFragments.size() +" running fragments 
to complete before shutting down");
+
+      if (!areQueriesAndFragmentsEmpty()) {
+        logger.info("New Fragments or queries are added while drillbit is 
Shutting down");
+      } else {
+        isEmptyCondition.signal();
       }
+    } finally {
+      isEmptyLock.unlock();
     }
   }
   /**
@@ -256,9 +275,9 @@ public class WorkManager implements AutoCloseable {
 
       final QueryId queryId = foreman.getQueryId();
       final boolean wasRemoved = queries.remove(queryId, foreman);
+
       if (!wasRemoved) {
         logger.warn("Couldn't find retiring Foreman for query " + queryId);
-//        throw new IllegalStateException("Couldn't find retiring Foreman for 
query " + queryId);
       }
 
       indicateIfSafeToExit();

http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 391f100..276d839 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -802,12 +802,17 @@ public class Foreman implements Runnable {
       fragmentsRunner.getBee().retireForeman(Foreman.this);
 
       try {
+        queryContext.close();
+      } catch (Exception e) {
+        logger.error("Unable to close query context for query {}", 
QueryIdHelper.getQueryId(queryId), e);
+      }
+
+      try {
         queryManager.close();
       } catch (final Exception e) {
         logger.warn("unable to close query manager", e);
       }
 
-
       queryStateProcessor.close();
 
       try {

http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
index 29b3580..f291779 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.util.Pointer;
@@ -64,7 +65,7 @@ public class PlanSplitter {
   public QueryPlanFragments planFragments(DrillbitContext dContext, QueryId 
queryId,
       GetQueryPlanFragments req, UserClientConnection connection) {
     QueryPlanFragments.Builder responseBuilder = 
QueryPlanFragments.newBuilder();
-    QueryContext queryContext = new QueryContext(connection.getSession(), 
dContext, queryId);
+    final QueryContext queryContext = new 
QueryContext(connection.getSession(), dContext, queryId);
 
     responseBuilder.setQueryId(queryId);
 
@@ -79,6 +80,14 @@ public class PlanSplitter {
       responseBuilder.setStatus(QueryState.FAILED);
       responseBuilder.setError(error);
     }
+
+    try {
+      queryContext.close();
+    } catch (Exception e) {
+      logger.error("Error closing QueryContext when getting plan fragments for 
query {}.",
+        QueryIdHelper.getQueryId(queryId), e);
+    }
+
     return responseBuilder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/efed7e30/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java
index c26be22..a1d4eca 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java
@@ -21,10 +21,13 @@ import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.work.WorkManager;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
 
 import java.io.FileWriter;
 import java.io.IOException;
@@ -33,25 +36,26 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Properties;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.fail;
 import java.nio.file.Path;
 import java.io.BufferedWriter;
 
-
+import static org.junit.Assert.fail;
 
 @Category({SlowTest.class})
-public class TestGracefulShutdown extends BaseTestQuery{
+public class TestGracefulShutdown extends BaseTestQuery {
+
+  @Rule
+  public final TestRule TIMEOUT = TestTools.getTimeoutRule(180_000);
+
+  public static final int WAIT_TIMEOUT_MS = WorkManager.EXIT_TIMEOUT_MS + 
30_000;
 
   @BeforeClass
   public static void setUpTestData() throws Exception {
-
     for( int i = 0; i < 300; i++) {
       setupFile(i);
     }
   }
 
-
   public static final Properties WEBSERVER_CONFIGURATION = new Properties() {
     {
       put(ExecConstants.HTTP_ENABLE, true);
@@ -83,8 +87,6 @@ public class TestGracefulShutdown extends BaseTestQuery{
     return builder;
   }
 
-
-
   /*
   Start multiple drillbits and then shutdown a drillbit. Query the online
   endpoints and check if the drillbit still exists.
@@ -96,12 +98,12 @@ public class TestGracefulShutdown extends BaseTestQuery{
     ClusterFixtureBuilder builder = 
ClusterFixture.bareBuilder(dirTestWatcher).withBits(drillbits).withLocalZk();
     enableDrillPortHunting(builder);
 
-    try ( ClusterFixture cluster = builder.build();
-          ClientFixture client = cluster.clientFixture()) {
+    try ( ClusterFixture cluster = builder.build()) {
 
       Drillbit drillbit = cluster.drillbit("db2");
       DrillbitEndpoint drillbitEndpoint =  
drillbit.getRegistrationHandle().getEndPoint();
       int grace_period = 
drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD);
+
       new Thread(new Runnable() {
         public void run() {
           try {
@@ -111,12 +113,28 @@ public class TestGracefulShutdown extends BaseTestQuery{
           }
         }
       }).start();
-      //wait for graceperiod
+
       Thread.sleep(grace_period);
-      Collection<DrillbitEndpoint> drillbitEndpoints = 
cluster.drillbit().getContext()
-              .getClusterCoordinator()
-              .getOnlineEndPoints();
-      Assert.assertFalse(drillbitEndpoints.contains(drillbitEndpoint));
+
+      long currentTime = System.currentTimeMillis();
+      long stopTime = currentTime + WAIT_TIMEOUT_MS;
+
+      while (currentTime < stopTime) {
+        Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
+          .getContext()
+          .getClusterCoordinator()
+          .getOnlineEndPoints();
+
+        if (!drillbitEndpoints.contains(drillbitEndpoint)) {
+          // Success
+          return;
+        }
+
+        Thread.sleep(100L);
+        currentTime = System.currentTimeMillis();
+      }
+
+      Assert.fail("Timed out");
     }
   }
   /*
@@ -130,8 +148,7 @@ public class TestGracefulShutdown extends BaseTestQuery{
     ClusterFixtureBuilder builder = 
ClusterFixture.bareBuilder(dirTestWatcher).withBits(drillbits).withLocalZk();
     enableDrillPortHunting(builder);
 
-    try ( ClusterFixture cluster = builder.build();
-          ClientFixture client = cluster.clientFixture()) {
+    try (ClusterFixture cluster = builder.build()) {
       Drillbit drillbit = cluster.drillbit("db2");
       int grace_period = 
drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD);
       DrillbitEndpoint drillbitEndpoint =  
drillbit.getRegistrationHandle().getEndPoint();
@@ -144,15 +161,31 @@ public class TestGracefulShutdown extends BaseTestQuery{
           }
         }
       }).start();
+
       Thread.sleep(grace_period);
-      Collection<DrillbitEndpoint> drillbitEndpoints = 
cluster.drillbit().getContext()
-              .getClusterCoordinator()
-              .getAvailableEndpoints();
-      for (DrillbitEndpoint dbEndpoint : drillbitEndpoints) {
-        if(drillbitEndpoint.getAddress().equals(dbEndpoint.getAddress()) && 
drillbitEndpoint.getUserPort() == dbEndpoint.getUserPort()) {
-          assertNotEquals(dbEndpoint.getState(),DrillbitEndpoint.State.ONLINE);
+
+      long currentTime = System.currentTimeMillis();
+      long stopTime = currentTime + WAIT_TIMEOUT_MS;
+
+      while (currentTime < stopTime) {
+        Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
+          .getContext()
+          .getClusterCoordinator()
+          .getAvailableEndpoints();
+        for (DrillbitEndpoint dbEndpoint : drillbitEndpoints) {
+          if (drillbitEndpoint.getAddress().equals(dbEndpoint.getAddress()) && 
drillbitEndpoint.getUserPort() == dbEndpoint.getUserPort()) {
+            if (!dbEndpoint.getState().equals(DrillbitEndpoint.State.ONLINE)) {
+              // Success
+              return;
+            }
+          }
         }
+
+        Thread.sleep(100L);
+        currentTime = System.currentTimeMillis();
       }
+
+      Assert.fail("Timed out");
     }
   }
 
@@ -185,12 +218,23 @@ public class TestGracefulShutdown extends BaseTestQuery{
         port++;
       }
       Thread.sleep(grace_period);
-      Collection<DrillbitEndpoint> drillbitEndpoints = 
cluster.drillbit().getContext()
+      Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
+              .getContext()
               .getClusterCoordinator()
               .getOnlineEndPoints();
-      while(!listener.isDone()) {
-        Thread.sleep(10);
+
+      long currentTime = System.currentTimeMillis();
+      long stopTime = currentTime + WAIT_TIMEOUT_MS;
+
+      while (currentTime < stopTime) {
+        if (listener.isDone()) {
+          break;
+        }
+
+        Thread.sleep(100L);
+        currentTime = System.currentTimeMillis();
       }
+
       Assert.assertTrue(listener.isDone());
       Assert.assertEquals(1,drillbitEndpoints.size());
     }
@@ -214,6 +258,7 @@ public class TestGracefulShutdown extends BaseTestQuery{
       int grace_period = 
drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD);
       listener =  client.queryBuilder().sql(sql).futureSummary();
       Thread.sleep(10000);
+
       while( port < 8048) {
         URL url = new URL("http://localhost:"+port+"/shutdown";);
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@@ -224,15 +269,27 @@ public class TestGracefulShutdown extends BaseTestQuery{
         }
         port++;
       }
+
       Thread.sleep(grace_period);
-      Thread.sleep(5000);
-      Collection<DrillbitEndpoint> drillbitEndpoints = 
cluster.drillbit().getContext()
-              .getClusterCoordinator().getAvailableEndpoints();
-      while(!listener.isDone()) {
-        Thread.sleep(10);
+
+      Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
+              .getContext()
+              .getClusterCoordinator()
+              .getAvailableEndpoints();
+
+      long currentTime = System.currentTimeMillis();
+      long stopTime = currentTime + WAIT_TIMEOUT_MS;
+
+      while (currentTime < stopTime) {
+        if (listener.isDone() && drillbitEndpoints.size() == 2) {
+          return;
+        }
+
+        Thread.sleep(100L);
+        currentTime = System.currentTimeMillis();
       }
-      Assert.assertTrue(listener.isDone());
-      Assert.assertEquals(2,drillbitEndpoints.size());
+
+      Assert.fail("Timed out");
     }
   }
 

Reply via email to