IMPALA-7476: Use context to limit scope of StatestoreSubscriber StatestoreSubscribers are outliving the end of test_statestore.py, causing continued interaction with the statestored with a lot of log spew in statestored.INFO and the main test log.
This change converts StatestoreSubscriber to use a Python context manager. It updates all the tests to use this context manager. This ensures that the StatestoreSubscriber runs kill() at the end of each test. There is also a circular reference between StatestoreSubscriber and KillableThreadedServer that prevents cleanup. This resolves the circular reference by nulling out some variables on KillableThreadedServer::shutdown(). Testing: - Verified log spew is gone - StatestoreSubscriber::__del__ is called for 16 out of 17 tests (exception: test_hung_heartbeat) where before it was called for none Change-Id: Ie71fac6095cd94343f68b586238aed410ebbca39 Reviewed-on: http://gerrit.cloudera.org:8080/11326 Reviewed-by: Joe McDonnell <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a8f8c8d6 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a8f8c8d6 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a8f8c8d6 Branch: refs/heads/master Commit: a8f8c8d6f58d750c6949ff89e6f88f608d1ce462 Parents: 71b36fe Author: Joe McDonnell <[email protected]> Authored: Thu Aug 23 15:24:09 2018 -0700 Committer: Joe McDonnell <[email protected]> Committed: Sat Aug 25 19:26:34 2018 +0000 ---------------------------------------------------------------------- tests/statestore/test_statestore.py | 262 ++++++++++++++++--------------- 1 file changed, 137 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/a8f8c8d6/tests/statestore/test_statestore.py ---------------------------------------------------------------------- diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py index d45deeb..8f26b63 100644 --- a/tests/statestore/test_statestore.py +++ b/tests/statestore/test_statestore.py @@ -104,6 +104,9 @@ class KillableThreadedServer(TServer): self.is_shutdown = True self.serverTransport.close() self.wait_until_down() + # The processor contains a reference to a StatestoreSubscriber. Clean up that + # reference to avoid a circular reference that would prevent object deletion. + self.processor = None def wait_until_up(self, num_tries=10): for i in xrange(num_tries): @@ -180,6 +183,13 @@ class StatestoreSubscriber(object): self.subscriber_id = "python-test-client-%s" % uuid.uuid1() self.exception = None + def __enter__(self): + return self + + def __exit__(self, *args): + self.kill() + self.wait_for_failure() + def Heartbeat(self, args): """Heartbeat RPC handler. Calls heartbeat callback if one exists.""" self.heartbeat_event.acquire() @@ -241,8 +251,10 @@ class StatestoreSubscriber(object): def kill(self): """Closes both the server and client sockets, and waits for the server to become unavailable""" - self.client_transport.close() - self.server.shutdown() + if self.client_transport: + self.client_transport.close() + if self.server: + self.server.shutdown() return self def start(self): @@ -335,17 +347,17 @@ class TestStatestore(): def test_registration_ids_different(self): """Test that if a subscriber with the same id registers twice, the registration ID is different""" - sub = StatestoreSubscriber() - sub.start().register() - old_reg_id = sub.registration_id - sub.register() - assert old_reg_id != sub.registration_id + with StatestoreSubscriber() as sub: + sub.start().register() + old_reg_id = sub.registration_id + sub.register() + assert old_reg_id != sub.registration_id def test_receive_heartbeats(self): """Smoke test to confirm that heartbeats get sent to a correctly registered subscriber""" - sub = StatestoreSubscriber() - sub.start().register().wait_for_heartbeat(5) + with StatestoreSubscriber() as sub: + sub.start().register().wait_for_heartbeat(5) def test_receive_updates(self): """Test that updates are correctly received when a subscriber alters a topic""" @@ -370,13 +382,13 @@ class TestStatestore(): return DEFAULT_UPDATE_STATE_RESPONSE - sub = StatestoreSubscriber(update_cb=topic_update_correct) - reg = TTopicRegistration(topic_name=topic_name, is_transient=False) - ( - sub.start() - .register(topics=[reg]) - .wait_for_update(topic_name, 3) - ) + with StatestoreSubscriber(update_cb=topic_update_correct) as sub: + reg = TTopicRegistration(topic_name=topic_name, is_transient=False) + ( + sub.start() + .register(topics=[reg]) + .wait_for_update(topic_name, 3) + ) def test_filter_prefix(self): topic_name = "topic_delta_%s" % uuid.uuid1() @@ -418,14 +430,14 @@ class TestStatestore(): return DEFAULT_UPDATE_STATE_RESPONSE - sub = StatestoreSubscriber(update_cb=topic_update_correct) - reg = TTopicRegistration(topic_name=topic_name, is_transient=False, - filter_prefix="bar") - ( - sub.start() - .register(topics=[reg]) - .wait_for_update(topic_name, 5) - ) + with StatestoreSubscriber(update_cb=topic_update_correct) as sub: + reg = TTopicRegistration(topic_name=topic_name, is_transient=False, + filter_prefix="bar") + ( + sub.start() + .register(topics=[reg]) + .wait_for_update(topic_name, 5) + ) def test_update_is_delta(self): """Test that the 'is_delta' flag is correctly set. The first update for a topic should @@ -452,13 +464,13 @@ class TestStatestore(): return DEFAULT_UPDATE_STATE_RESPONSE - sub = StatestoreSubscriber(update_cb=check_delta) - reg = TTopicRegistration(topic_name=topic_name, is_transient=False) - ( - sub.start() - .register(topics=[reg]) - .wait_for_update(topic_name, 3) - ) + with StatestoreSubscriber(update_cb=check_delta) as sub: + reg = TTopicRegistration(topic_name=topic_name, is_transient=False) + ( + sub.start() + .register(topics=[reg]) + .wait_for_update(topic_name, 3) + ) def test_skipped(self): """Test that skipping an update causes it to be resent""" @@ -478,39 +490,39 @@ class TestStatestore(): assert len(args.topic_deltas[topic_name].topic_entries) == 1 return TUpdateStateResponse(status=STATUS_OK, skipped=True) - sub = StatestoreSubscriber(update_cb=check_skipped) - reg = TTopicRegistration(topic_name=topic_name, is_transient=False) - ( - sub.start() - .register(topics=[reg]) - .wait_for_update(topic_name, 3) - ) + with StatestoreSubscriber(update_cb=check_skipped) as sub: + reg = TTopicRegistration(topic_name=topic_name, is_transient=False) + ( + sub.start() + .register(topics=[reg]) + .wait_for_update(topic_name, 3) + ) def test_failure_detected(self): - sub = StatestoreSubscriber() - topic_name = "test_failure_detected" - reg = TTopicRegistration(topic_name=topic_name, is_transient=True) - ( - sub.start() - .register(topics=[reg]) - .wait_for_update(topic_name, 1) - .kill() - .wait_for_failure() - ) + with StatestoreSubscriber() as sub: + topic_name = "test_failure_detected" + reg = TTopicRegistration(topic_name=topic_name, is_transient=True) + ( + sub.start() + .register(topics=[reg]) + .wait_for_update(topic_name, 1) + .kill() + .wait_for_failure() + ) def test_hung_heartbeat(self): """Test for IMPALA-1712: If heartbeats hang (which we simulate by sleeping for five minutes) the statestore should time them out every 3s and then eventually fail after 40s (10 times (3 + 1), where the 1 is the inter-heartbeat delay)""" - sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300)) - topic_name = "test_hung_heartbeat" - reg = TTopicRegistration(topic_name=topic_name, is_transient=True) - ( - sub.start() - .register(topics=[reg]) - .wait_for_update(topic_name, 1) - .wait_for_failure(timeout=60) - ) + with StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300)) as sub: + topic_name = "test_hung_heartbeat" + reg = TTopicRegistration(topic_name=topic_name, is_transient=True) + ( + sub.start() + .register(topics=[reg]) + .wait_for_update(topic_name, 1) + .wait_for_failure(timeout=60) + ) def test_topic_persistence(self): """Test that persistent topic entries survive subscriber failure, but transent topic @@ -551,23 +563,23 @@ class TestStatestore(): reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False), TTopicRegistration(topic_name=transient_topic_name, is_transient=True)] - sub = StatestoreSubscriber(update_cb=add_entries) - ( - sub.start() - .register(topics=reg) - .wait_for_update(persistent_topic_name, 2) - .wait_for_update(transient_topic_name, 2) - .kill() - .wait_for_failure() - ) - - sub2 = StatestoreSubscriber(update_cb=check_entries) - ( - sub2.start() - .register(topics=reg) - .wait_for_update(persistent_topic_name, 1) - .wait_for_update(transient_topic_name, 1) - ) + with StatestoreSubscriber(update_cb=add_entries) as sub: + ( + sub.start() + .register(topics=reg) + .wait_for_update(persistent_topic_name, 2) + .wait_for_update(transient_topic_name, 2) + .kill() + .wait_for_failure() + ) + + with StatestoreSubscriber(update_cb=check_entries) as sub2: + ( + sub2.start() + .register(topics=reg) + .wait_for_update(persistent_topic_name, 1) + .wait_for_update(transient_topic_name, 1) + ) def test_update_with_clear_entries_flag(self): """Test that the statestore clears all topic entries when a subscriber @@ -598,47 +610,47 @@ class TestStatestore(): return DEFAULT_UPDATE_STATE_RESPONSE reg = [TTopicRegistration(topic_name=topic_name, is_transient=False)] - sub1 = StatestoreSubscriber(update_cb=add_entries) - ( - sub1.start() - .register(topics=reg) - .wait_for_update(topic_name, 1) - .kill() - .wait_for_failure() - .start() - .register(topics=reg) - .wait_for_update(topic_name, 2) - ) - - sub2 = StatestoreSubscriber(update_cb=check_entries) - ( - sub2.start() - .register(topics=reg) - .wait_for_update(topic_name, 2) - ) + with StatestoreSubscriber(update_cb=add_entries) as sub1: + ( + sub1.start() + .register(topics=reg) + .wait_for_update(topic_name, 1) + .kill() + .wait_for_failure() + .start() + .register(topics=reg) + .wait_for_update(topic_name, 2) + ) + + with StatestoreSubscriber(update_cb=check_entries) as sub2: + ( + sub2.start() + .register(topics=reg) + .wait_for_update(topic_name, 2) + ) def test_heartbeat_failure_reset(self): """Regression test for IMPALA-6785: the heartbeat failure count for the subscriber ID should be reset when it resubscribes, not after the first successful heartbeat. Delay the heartbeat to force the topic update to finish first.""" - sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(0.5)) - topic_name = "test_heartbeat_failure_reset" - reg = TTopicRegistration(topic_name=topic_name, is_transient=True) - sub.start() - sub.register(topics=[reg]) - LOG.info("Registered with id {0}".format(sub.subscriber_id)) - sub.wait_for_heartbeat(1) - sub.kill() - LOG.info("Killed, waiting for statestore to detect failure via heartbeats") - sub.wait_for_failure() - # IMPALA-6785 caused only one topic update to be send. Wait for multiple updates to - # be received to confirm that the subsequent updates are being scheduled repeatedly. - target_updates = sub.update_counts[topic_name] + 5 - sub.start() - sub.register(topics=[reg]) - LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id)) - sub.wait_for_update(topic_name, target_updates) + with StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(0.5)) as sub: + topic_name = "test_heartbeat_failure_reset" + reg = TTopicRegistration(topic_name=topic_name, is_transient=True) + sub.start() + sub.register(topics=[reg]) + LOG.info("Registered with id {0}".format(sub.subscriber_id)) + sub.wait_for_heartbeat(1) + sub.kill() + LOG.info("Killed, waiting for statestore to detect failure via heartbeats") + sub.wait_for_failure() + # IMPALA-6785 caused only one topic update to be send. Wait for multiple updates to + # be received to confirm that the subsequent updates are being scheduled repeatedly. + target_updates = sub.update_counts[topic_name] + 5 + sub.start() + sub.register(topics=[reg]) + LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id)) + sub.wait_for_update(topic_name, target_updates) def test_min_subscriber_topic_version(self): self._do_test_min_subscriber_topic_version(False) @@ -716,19 +728,19 @@ class TestStatestore(): # version, the other which just consumes the updates. def producer_callback(sub, args): return callback(sub, args, True, "producer") def consumer_callback(sub, args): return callback(sub, args, False, "consumer") - consumer_sub = StatestoreSubscriber(update_cb=consumer_callback) - consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True) - producer_sub = StatestoreSubscriber(update_cb=producer_callback) - producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True, - populate_min_subscriber_topic_version=True) - NUM_UPDATES = 6 - ( - consumer_sub.start() - .register(topics=[consumer_reg]) - ) - ( - producer_sub.start() - .register(topics=[producer_reg]) - .wait_for_update(topic_name, NUM_UPDATES) - ) - consumer_sub.wait_for_update(topic_name, NUM_UPDATES) + with StatestoreSubscriber(update_cb=consumer_callback) as consumer_sub: + with StatestoreSubscriber(update_cb=producer_callback) as producer_sub: + consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True) + producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True, + populate_min_subscriber_topic_version=True) + NUM_UPDATES = 6 + ( + consumer_sub.start() + .register(topics=[consumer_reg]) + ) + ( + producer_sub.start() + .register(topics=[producer_reg]) + .wait_for_update(topic_name, NUM_UPDATES) + ) + consumer_sub.wait_for_update(topic_name, NUM_UPDATES)
