IMPALA-3772: Fix admission control flaky stress test The admission control stress test could occasionally fail with some queued queries timing out. At some point the default statestore topic update rpc frequency was increased to 2sec, and because the updates trigger the admission control queue check (to determine if queued queries can be dequeued), it is possible that the topic updates weren't frequent enough for a large admission control queue to fully drain within the 60sec queue timeout.
In the test that failed, queries were submitted round-robin so the admission control behavior is non-deterministic. There isn't enough information to prove that this is definitely the issue, but it seems likely and improving the test seems valuable regardless. This change reduces the statestore topic update frequency to 500ms so that state can be shared faster and the queued requests have more chances to drain within the 60sec queue timeout. The code was also a bit confusing before because it was waiting for statestore topic updates but the test had only changed the heartbeat frequency (which is a different RPC). This now sets the lower frequency for both and makes the code a bit more clear. Change-Id: I235a14c4674240dc0a01dabb664da87c752153cf Reviewed-on: http://gerrit.cloudera.org:8080/3450 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Matthew Jacobs <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4680252a Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4680252a Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4680252a Branch: refs/heads/master Commit: 4680252a9e4fc9ad63060010050d11e2c8355705 Parents: 55b2a63 Author: Matthew Jacobs <[email protected]> Authored: Wed Jun 22 13:09:59 2016 -0700 Committer: Taras Bobrovytsky <[email protected]> Committed: Thu Jul 14 19:04:44 2016 +0000 ---------------------------------------------------------------------- tests/custom_cluster/test_admission_controller.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4680252a/tests/custom_cluster/test_admission_controller.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 5baf35e..dfcdcc7 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -46,8 +46,8 @@ ROUND_ROBIN_SUBMISSION = [True, False] # pool with the parameters below. POOL_NAME = "default-pool" -# The statestore heartbeat of the impala cluster the test is executing against -STATESTORE_HEARTBEAT_MS = 500 +# The statestore heartbeat and topic update frequency (ms). Set low for testing. +STATESTORE_RPC_FREQUENCY_MS = 500 # The number of queries that can execute concurrently in the pool POOL_NAME. MAX_NUM_CONCURRENT_QUERIES = 5 @@ -58,7 +58,9 @@ MAX_NUM_QUEUED_QUERIES = 10 # Mem limit (bytes) used in the mem limit test MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024 -_STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s" % (STATESTORE_HEARTBEAT_MS) +_STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s "\ + "-statestore_update_frequency_ms=%s" %\ + (STATESTORE_RPC_FREQUENCY_MS, STATESTORE_RPC_FREQUENCY_MS) # Key in the query profile for the query options. PROFILE_QUERY_OPTIONS_KEY = "Query Options (non default): " @@ -410,7 +412,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): "Timed out waiting %s seconds for metrics" % (timeout,) sleep(1) - def wait_for_heartbeats(self, heartbeats, timeout=30): + def wait_for_statestore_updates(self, heartbeats, timeout=30): """Waits for a number of statestore heartbeats from all impalads.""" start_time = time() num_impalads = len(self.impalads) @@ -422,7 +424,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): curr[impalad] = init[impalad] while True: - LOG.debug("wait_for_heartbeats: curr=%s, init=%s, d=%s", curr.values(), + LOG.debug("wait_for_statestore_updates: curr=%s, init=%s, d=%s", curr.values(), init.values(), [curr[i] - init[i] for i in self.impalads]) if all([curr[i] - init[i] >= heartbeats for i in self.impalads]): break for impalad in self.impalads: @@ -430,7 +432,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): 'statestore-subscriber.topic-update-interval-time')['count'] assert (time() - start_time < timeout),\ "Timed out waiting %s seconds for heartbeats" % (timeout,) - sleep(0.5) + sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000)) LOG.debug("Waited %s for %s heartbeats", round(time() - start_time, 1), heartbeats) def wait_for_admitted_threads(self, num_threads, timeout=30): @@ -628,9 +630,9 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): (metric_deltas, _) = self.wait_for_metric_changes(['admitted'], curr_metrics, expected_admitted) self.wait_for_admitted_threads(metric_deltas['admitted']) - # Wait a few heartbeats to ensure the admission controllers have reached a steady + # Wait a few topic updates to ensure the admission controllers have reached a steady # state or we may find an impalad dequeue more requests after we capture metrics. - self.wait_for_heartbeats(4) + self.wait_for_statestore_updates(10) final_metrics = self.get_admission_metrics(); log_metrics("Final metrics: ", final_metrics, logging.INFO);
