Repository: impala
Updated Branches:
  refs/heads/master f8b472ee6 -> e38715e25


IMPALA-7306: regression test for non-removed transient updates

Adds a test for IMPALA-7305 that reproduces the bug by delaying
heartbeats and updates.

Increased some timeouts in the test because they were hit
once after looping for ~12 hours.

Testing:
Manually reintroduced the bug by commenting out the code that
fixed it and confirmed that the test failed.

Change-Id: I6c2a39d8a76cb5371f394b5a97817d8231e473cc
Reviewed-on: http://gerrit.cloudera.org:8080/11470
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e38715e2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e38715e2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e38715e2

Branch: refs/heads/master
Commit: e38715e25297cc3643482be04e3b1b273e339b54
Parents: f8b472e
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Tue Sep 18 15:50:30 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Tue Sep 25 04:52:12 2018 +0000

----------------------------------------------------------------------
 tests/statestore/test_statestore.py | 89 ++++++++++++++++++++++++++++----
 1 file changed, 79 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e38715e2/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py 
b/tests/statestore/test_statestore.py
index 23f8aa8..f6a6bb2 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -70,6 +70,9 @@ DEFAULT_UPDATE_STATE_RESPONSE = 
TUpdateStateResponse(status=STATUS_OK, topic_upd
 
 # IMPALA-3501: the timeout needs to be higher in code coverage builds
 WAIT_FOR_FAILURE_TIMEOUT = specific_build_type_timeout(40, 
code_coverage_build_timeout=60)
+WAIT_FOR_HEARTBEAT_TIMEOUT = specific_build_type_timeout(
+    40, code_coverage_build_timeout=60)
+WAIT_FOR_UPDATE_TIMEOUT = specific_build_type_timeout(40, 
code_coverage_build_timeout=60)
 
 class WildcardServerSocket(TSocket.TSocketBase, 
TTransport.TServerTransportBase):
   """Specialised server socket that binds to a random port at construction"""
@@ -292,10 +295,11 @@ class StatestoreSubscriber(object):
       while count > self.heartbeat_count:
         self.check_thread_exceptions()
         last_count = self.heartbeat_count
-        self.heartbeat_event.wait(10)
+        self.heartbeat_event.wait(WAIT_FOR_HEARTBEAT_TIMEOUT)
         if last_count == self.heartbeat_count:
-          raise Exception("Heartbeat not received within 10s (heartbeat count: 
%s)" %
-                          self.heartbeat_count)
+          raise Exception(
+              "Heartbeat not received within {0}s (heartbeat count: 
{1})".format(
+                WAIT_FOR_HEARTBEAT_TIMEOUT, self.heartbeat_count))
       self.check_thread_exceptions()
       return self
     finally:
@@ -313,11 +317,12 @@ class StatestoreSubscriber(object):
       while count > self.update_counts[topic_name]:
         self.check_thread_exceptions()
         last_count = self.update_counts[topic_name]
-        self.update_event.wait(10)
-        if (time.time() > start_time + 10 and
-            last_count == self.update_counts[topic_name]):
-          raise Exception("Update not received for %s within 10s (update 
count: %s)" %
-                          (topic_name, last_count))
+        self.update_event.wait(WAIT_FOR_UPDATE_TIMEOUT)
+        if (time.time() > start_time + WAIT_FOR_UPDATE_TIMEOUT and
+              last_count == self.update_counts[topic_name]):
+          raise Exception(
+              "Update not received for {0} within {1} (update count: 
{2})".format(
+                topic_name, WAIT_FOR_UPDATE_TIMEOUT, last_count))
       self.check_thread_exceptions()
       return self
     finally:
@@ -328,11 +333,13 @@ class StatestoreSubscriber(object):
     """Waits until this subscriber no longer appears in the statestore's 
subscriber
     list. If 'timeout' seconds pass, throws an exception."""
     start = time.time()
-    while time.time() - start < timeout:
+    while True:
       subs = [s["id"] for s in get_statestore_subscribers()["subscribers"]]
       if self.subscriber_id not in subs: return self
+      if time.time() - start > timeout:
+        raise Exception("Subscriber {0} did not fail in {1}s".format(
+            self.subscriber_id, timeout))
       time.sleep(0.2)
-    raise Exception("Subscriber %s did not fail in %ss" % (self.subscriber_id, 
timeout))
 
 class TestStatestore():
   def make_topic_update(self, topic_name, key_template="foo", 
value_template="bar",
@@ -764,3 +771,65 @@ class TestStatestore():
                       .wait_for_update(topic_name, NUM_UPDATES)
         )
         consumer_sub.wait_for_update(topic_name, NUM_UPDATES)
+
+  def test_transient_entry_removal_race(self):
+    """IMPALA-7306: transient entries were not deleted if the subscriber is 
unregistered
+    while it is in the middle of a callback. This test exercises that case by 
blocking
+    the update callback so that it is still running when the statestore 
unregisters the
+    subscriber for failed heartbeats. It also confirms that non-transient 
entries are not
+    removed."""
+    transient_topic_name = "test_transient_entry_removal_race_transient"
+    non_transient_topic_name = 
"test_transient_entry_removal_race_non_transient"
+    topic_regs = [TTopicRegistration(topic_name=transient_topic_name, 
is_transient=True),
+          TTopicRegistration(topic_name=non_transient_topic_name, 
is_transient=False)]
+    # The heartbeat timeout is 3s, so sleep for long enough for it to expire
+    HEARTBEAT_DELAY = 10
+
+    def delayed_heartbeat(sub, args):
+      LOG.info("Heartbeat callback called")
+      time.sleep(HEARTBEAT_DELAY)
+      LOG.debug("Heartbeat callback about to return")
+
+    def add_transient_entries_after_hb_failure(sub, args):
+      LOG.info("Update callback called")
+      # Add an additional delay so that this returns after the heartbeat.
+      time.sleep(WAIT_FOR_FAILURE_TIMEOUT)
+      updates = [self.make_topic_update(transient_topic_name, "k", "v"),
+                 self.make_topic_update(non_transient_topic_name, "k", "v")]
+      LOG.debug("Update callback about to return")
+      return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates, 
skipped=False)
+
+    # Subscriber with delay creates a transient entry, which should not be 
added since
+    # the subscriber failed and was unregistered.
+    with StatestoreSubscriber(heartbeat_cb=delayed_heartbeat,
+                              
update_cb=add_transient_entries_after_hb_failure) as sub:
+      # Wait for the first update (which should happen after failure), then 
confirm
+      # that the failure occurred.
+      (
+        sub.start()
+           .register(topics=topic_regs)
+           .wait_for_update(transient_topic_name, 1)
+           .wait_for_failure(timeout=WAIT_FOR_FAILURE_TIMEOUT)
+       )
+
+    def verify_transient_entry_removed(sub, args):
+      transient_delta = args.topic_deltas[transient_topic_name]
+      assert len(transient_delta.topic_entries) == 0, args
+      non_transient_delta = args.topic_deltas[non_transient_topic_name]
+      # Non-transient update should include topic that was not removed
+      assert len(non_transient_delta.topic_entries) == 1, args
+      entry = non_transient_delta.topic_entries[0]
+      assert entry.key == "k0"
+      assert entry.value == "v0"
+      assert not entry.deleted
+      # Skip updates so that statestore will re-send non-transient entries and 
the above
+      # assertions remain valid on subsequent callbacks.
+      return TUpdateStateResponse(status=STATUS_OK, topic_updates=[], 
skipped=True)
+
+    # Verify that the transient entry for the failed subscriber is not present.
+    with StatestoreSubscriber(update_cb=verify_transient_entry_removed) as sub:
+      (
+        sub.start()
+           .register(topics=topic_regs)
+           .wait_for_update(transient_topic_name, 1)
+       )

Reply via email to