Repository: impala Updated Branches: refs/heads/master f8b472ee6 -> e38715e25
IMPALA-7306: regression test for non-removed transient updates Adds a test for IMPALA-7305 that reproduces the bug by delaying heartbeats and updates. Increased some timeouts in the test because they were hit once after looping for ~12 hours. Testing: Manually reintroduced the bug by commenting out the code that fixed it and confirmed that the test failed. Change-Id: I6c2a39d8a76cb5371f394b5a97817d8231e473cc Reviewed-on: http://gerrit.cloudera.org:8080/11470 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e38715e2 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e38715e2 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e38715e2 Branch: refs/heads/master Commit: e38715e25297cc3643482be04e3b1b273e339b54 Parents: f8b472e Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Tue Sep 18 15:50:30 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Tue Sep 25 04:52:12 2018 +0000 ---------------------------------------------------------------------- tests/statestore/test_statestore.py | 89 ++++++++++++++++++++++++++++---- 1 file changed, 79 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/e38715e2/tests/statestore/test_statestore.py ---------------------------------------------------------------------- diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py index 23f8aa8..f6a6bb2 100644 --- a/tests/statestore/test_statestore.py +++ b/tests/statestore/test_statestore.py @@ -70,6 +70,9 @@ DEFAULT_UPDATE_STATE_RESPONSE = TUpdateStateResponse(status=STATUS_OK, topic_upd # IMPALA-3501: the timeout needs to be higher in code coverage builds WAIT_FOR_FAILURE_TIMEOUT = specific_build_type_timeout(40, code_coverage_build_timeout=60) +WAIT_FOR_HEARTBEAT_TIMEOUT = specific_build_type_timeout( + 40, code_coverage_build_timeout=60) +WAIT_FOR_UPDATE_TIMEOUT = specific_build_type_timeout(40, code_coverage_build_timeout=60) class WildcardServerSocket(TSocket.TSocketBase, TTransport.TServerTransportBase): """Specialised server socket that binds to a random port at construction""" @@ -292,10 +295,11 @@ class StatestoreSubscriber(object): while count > self.heartbeat_count: self.check_thread_exceptions() last_count = self.heartbeat_count - self.heartbeat_event.wait(10) + self.heartbeat_event.wait(WAIT_FOR_HEARTBEAT_TIMEOUT) if last_count == self.heartbeat_count: - raise Exception("Heartbeat not received within 10s (heartbeat count: %s)" % - self.heartbeat_count) + raise Exception( + "Heartbeat not received within {0}s (heartbeat count: {1})".format( + WAIT_FOR_HEARTBEAT_TIMEOUT, self.heartbeat_count)) self.check_thread_exceptions() return self finally: @@ -313,11 +317,12 @@ class StatestoreSubscriber(object): while count > self.update_counts[topic_name]: self.check_thread_exceptions() last_count = self.update_counts[topic_name] - self.update_event.wait(10) - if (time.time() > start_time + 10 and - last_count == self.update_counts[topic_name]): - raise Exception("Update not received for %s within 10s (update count: %s)" % - (topic_name, last_count)) + self.update_event.wait(WAIT_FOR_UPDATE_TIMEOUT) + if (time.time() > start_time + WAIT_FOR_UPDATE_TIMEOUT and + last_count == self.update_counts[topic_name]): + raise Exception( + "Update not received for {0} within {1} (update count: {2})".format( + topic_name, WAIT_FOR_UPDATE_TIMEOUT, last_count)) self.check_thread_exceptions() return self finally: @@ -328,11 +333,13 @@ class StatestoreSubscriber(object): """Waits until this subscriber no longer appears in the statestore's subscriber list. If 'timeout' seconds pass, throws an exception.""" start = time.time() - while time.time() - start < timeout: + while True: subs = [s["id"] for s in get_statestore_subscribers()["subscribers"]] if self.subscriber_id not in subs: return self + if time.time() - start > timeout: + raise Exception("Subscriber {0} did not fail in {1}s".format( + self.subscriber_id, timeout)) time.sleep(0.2) - raise Exception("Subscriber %s did not fail in %ss" % (self.subscriber_id, timeout)) class TestStatestore(): def make_topic_update(self, topic_name, key_template="foo", value_template="bar", @@ -764,3 +771,65 @@ class TestStatestore(): .wait_for_update(topic_name, NUM_UPDATES) ) consumer_sub.wait_for_update(topic_name, NUM_UPDATES) + + def test_transient_entry_removal_race(self): + """IMPALA-7306: transient entries were not deleted if the subscriber is unregistered + while it is in the middle of a callback. This test exercises that case by blocking + the update callback so that it is still running when the statestore unregisters the + subscriber for failed heartbeats. It also confirms that non-transient entries are not + removed.""" + transient_topic_name = "test_transient_entry_removal_race_transient" + non_transient_topic_name = "test_transient_entry_removal_race_non_transient" + topic_regs = [TTopicRegistration(topic_name=transient_topic_name, is_transient=True), + TTopicRegistration(topic_name=non_transient_topic_name, is_transient=False)] + # The heartbeat timeout is 3s, so sleep for long enough for it to expire + HEARTBEAT_DELAY = 10 + + def delayed_heartbeat(sub, args): + LOG.info("Heartbeat callback called") + time.sleep(HEARTBEAT_DELAY) + LOG.debug("Heartbeat callback about to return") + + def add_transient_entries_after_hb_failure(sub, args): + LOG.info("Update callback called") + # Add an additional delay so that this returns after the heartbeat. + time.sleep(WAIT_FOR_FAILURE_TIMEOUT) + updates = [self.make_topic_update(transient_topic_name, "k", "v"), + self.make_topic_update(non_transient_topic_name, "k", "v")] + LOG.debug("Update callback about to return") + return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates, skipped=False) + + # Subscriber with delay creates a transient entry, which should not be added since + # the subscriber failed and was unregistered. + with StatestoreSubscriber(heartbeat_cb=delayed_heartbeat, + update_cb=add_transient_entries_after_hb_failure) as sub: + # Wait for the first update (which should happen after failure), then confirm + # that the failure occurred. + ( + sub.start() + .register(topics=topic_regs) + .wait_for_update(transient_topic_name, 1) + .wait_for_failure(timeout=WAIT_FOR_FAILURE_TIMEOUT) + ) + + def verify_transient_entry_removed(sub, args): + transient_delta = args.topic_deltas[transient_topic_name] + assert len(transient_delta.topic_entries) == 0, args + non_transient_delta = args.topic_deltas[non_transient_topic_name] + # Non-transient update should include topic that was not removed + assert len(non_transient_delta.topic_entries) == 1, args + entry = non_transient_delta.topic_entries[0] + assert entry.key == "k0" + assert entry.value == "v0" + assert not entry.deleted + # Skip updates so that statestore will re-send non-transient entries and the above + # assertions remain valid on subsequent callbacks. + return TUpdateStateResponse(status=STATUS_OK, topic_updates=[], skipped=True) + + # Verify that the transient entry for the failed subscriber is not present. + with StatestoreSubscriber(update_cb=verify_transient_entry_removed) as sub: + ( + sub.start() + .register(topics=topic_regs) + .wait_for_update(transient_topic_name, 1) + )