IMPALA-3772: Fix admission control flaky stress test

The admission control stress test could occasionally fail
with some queued queries timing out. At some point the
default statestore topic update rpc frequency was increased
to 2sec, and because the updates trigger the admission
control queue check (to determine if queued queries can be
dequeued), it is possible that the topic updates weren't
frequent enough for a large admission control queue to fully
drain within the 60sec queue timeout.

In the test that failed, queries were submitted round-robin
so the admission control behavior is non-deterministic.
There isn't enough information to prove that this is
definitely the issue, but it seems likely and improving the
test seems valuable regardless.

This change reduces the statestore topic update frequency to
500ms so that state can be shared faster and the queued
requests have more chances to drain within the 60sec queue
timeout. The code was also a bit confusing before because it
was waiting for statestore topic updates but the test had
only changed the heartbeat frequency (which is a different
RPC). This now sets the lower frequency for both and makes
the code a bit more clear.

Change-Id: I235a14c4674240dc0a01dabb664da87c752153cf
Reviewed-on: http://gerrit.cloudera.org:8080/3450
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Matthew Jacobs <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4680252a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4680252a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4680252a

Branch: refs/heads/master
Commit: 4680252a9e4fc9ad63060010050d11e2c8355705
Parents: 55b2a63
Author: Matthew Jacobs <[email protected]>
Authored: Wed Jun 22 13:09:59 2016 -0700
Committer: Taras Bobrovytsky <[email protected]>
Committed: Thu Jul 14 19:04:44 2016 +0000

----------------------------------------------------------------------
 tests/custom_cluster/test_admission_controller.py | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4680252a/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 5baf35e..dfcdcc7 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -46,8 +46,8 @@ ROUND_ROBIN_SUBMISSION = [True, False]
 # pool with the parameters below.
 POOL_NAME = "default-pool"
 
-# The statestore heartbeat of the impala cluster the test is executing against
-STATESTORE_HEARTBEAT_MS = 500
+# The statestore heartbeat and topic update frequency (ms). Set low for 
testing.
+STATESTORE_RPC_FREQUENCY_MS = 500
 
 # The number of queries that can execute concurrently in the pool POOL_NAME.
 MAX_NUM_CONCURRENT_QUERIES = 5
@@ -58,7 +58,9 @@ MAX_NUM_QUEUED_QUERIES = 10
 # Mem limit (bytes) used in the mem limit test
 MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024
 
-_STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s" % 
(STATESTORE_HEARTBEAT_MS)
+_STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s "\
+                    "-statestore_update_frequency_ms=%s" %\
+                    (STATESTORE_RPC_FREQUENCY_MS, STATESTORE_RPC_FREQUENCY_MS)
 
 # Key in the query profile for the query options.
 PROFILE_QUERY_OPTIONS_KEY = "Query Options (non default): "
@@ -410,7 +412,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           "Timed out waiting %s seconds for metrics" % (timeout,)
       sleep(1)
 
-  def wait_for_heartbeats(self, heartbeats, timeout=30):
+  def wait_for_statestore_updates(self, heartbeats, timeout=30):
     """Waits for a number of statestore heartbeats from all impalads."""
     start_time = time()
     num_impalads = len(self.impalads)
@@ -422,7 +424,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       curr[impalad] = init[impalad]
 
     while True:
-      LOG.debug("wait_for_heartbeats: curr=%s, init=%s, d=%s", curr.values(),
+      LOG.debug("wait_for_statestore_updates: curr=%s, init=%s, d=%s", 
curr.values(),
           init.values(), [curr[i] - init[i] for i in self.impalads])
       if all([curr[i] - init[i] >= heartbeats for i in self.impalads]): break
       for impalad in self.impalads:
@@ -430,7 +432,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
             'statestore-subscriber.topic-update-interval-time')['count']
       assert (time() - start_time < timeout),\
           "Timed out waiting %s seconds for heartbeats" % (timeout,)
-      sleep(0.5)
+      sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000))
     LOG.debug("Waited %s for %s heartbeats", round(time() - start_time, 1), 
heartbeats)
 
   def wait_for_admitted_threads(self, num_threads, timeout=30):
@@ -628,9 +630,9 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       (metric_deltas, _) = self.wait_for_metric_changes(['admitted'], 
curr_metrics,
           expected_admitted)
       self.wait_for_admitted_threads(metric_deltas['admitted'])
-      # Wait a few heartbeats to ensure the admission controllers have reached 
a steady
+      # Wait a few topic updates to ensure the admission controllers have 
reached a steady
       # state or we may find an impalad dequeue more requests after we capture 
metrics.
-      self.wait_for_heartbeats(4)
+      self.wait_for_statestore_updates(10)
 
     final_metrics = self.get_admission_metrics();
     log_metrics("Final metrics: ", final_metrics, logging.INFO);

Reply via email to