IMPALA-7775: fix some lifecycle issues in statestore/session tests

Background threads from the Statestore's thread pool continued
running in the background and could dereference invalid memory.
We make sure these threads are cleaned up before moving onto
the next test. Note that we don't clean up all background
threads, just the ones that had caused issues here.

I refactored the memory management a bit to put all objects
that we can't safely free into a single ObjectPool.

The statestore tests also had an issue with the lifetime of the
string flags FLAGS_ssl_*_certificate. Those were overwritten
with new values while the thread pool threads were still running,
which could cause use-after-free bugs.

Testing:
Looped the tests under ASAN with the "stress" utility running at the
same time to flush out races.

Ran core tests.

Change-Id: I3b25c8b8a96bfa1183ce273b3bb4debde234dd01
Reviewed-on: http://gerrit.cloudera.org:8080/11864
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e5d07579
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e5d07579
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e5d07579

Branch: refs/heads/branch-3.1.0
Commit: e5d0757983802fbe8a0f473bae8ff2d022fc0117
Parents: 0d4c6ae
Author: Tim Armstrong <[email protected]>
Authored: Thu Nov 1 12:00:37 2018 -0700
Committer: Zoltan Borok-Nagy <[email protected]>
Committed: Tue Nov 13 12:50:23 2018 +0100

----------------------------------------------------------------------
 be/src/service/session-expiry-test.cc | 13 ++++-
 be/src/statestore/statestore-test.cc  | 89 ++++++++++++++++++------------
 be/src/statestore/statestore.cc       | 11 ++++
 be/src/statestore/statestore.h        |  7 ++-
 4 files changed, 81 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e5d07579/be/src/service/session-expiry-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/session-expiry-test.cc 
b/be/src/service/session-expiry-test.cc
index 89ae842..7e5b86a 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -44,14 +44,20 @@ DECLARE_int32(beeswax_port);
 // TODO: Come up with a short-running test that confirms a session will keep 
itself alive
 // that doesn't depend upon being rescheduled in a timely fashion.
 
+// Object pool containing all objects that must live for the duration of the 
process.
+// E.g. objects that are singletons and never destroyed in a real daemon (so 
don't support
+// tear-down logic), but which we create multiple times in unit tests. We leak 
this pool
+// instead of destroying it to avoid destroying the contained objects.
+static ObjectPool* perm_objects;
+
 TEST(SessionTest, TestExpiry) {
   const int NUM_SESSIONS = 5;
   const int MAX_IDLE_TIMEOUT_MS = 4000;
   FLAGS_idle_session_timeout = 1;
   // Skip validation checks for in-process backend.
   FLAGS_abort_on_config_error = false;
-  scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
-  Statestore* statestore = new Statestore(metrics.get());
+  MetricGroup* metrics = perm_objects->Add(new MetricGroup("statestore"));
+  Statestore* statestore = perm_objects->Add(new Statestore(metrics));
   IGNORE_LEAKING_OBJECT(statestore);
   // Pass in 0 to have the statestore use an ephemeral port for the service.
   ABORT_IF_ERROR(statestore->Init(0));
@@ -111,11 +117,14 @@ TEST(SessionTest, TestExpiry) {
   // work). Sleep to allow the threads closing the session to complete before 
tearing down
   // the server.
   SleepForMs(1000);
+  statestore->ShutdownForTesting();
 }
 
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   InitFeSupport();
+  perm_objects = new ObjectPool;
+  IGNORE_LEAKING_OBJECT(perm_objects);
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e5d07579/be/src/statestore/statestore-test.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-test.cc 
b/be/src/statestore/statestore-test.cc
index a9ee095..f517a56 100644
--- a/be/src/statestore/statestore-test.cc
+++ b/be/src/statestore/statestore-test.cc
@@ -34,77 +34,94 @@ DECLARE_int32(state_store_port);
 
 namespace impala {
 
+// Object pool containing all objects that must live for the duration of the 
process.
+// E.g. objects that are singletons and never destroyed in a real daemon (so 
don't support
+// tear-down logic), but which we create multiple times in unit tests. We leak 
this pool
+// instead of destroying it to avoid destroying the contained objects.
+static ObjectPool* perm_objects;
+
 TEST(StatestoreTest, SmokeTest) {
   // All allocations done by 'new' to avoid problems shutting down Thrift 
servers
   // gracefully.
-  scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
-  Statestore* statestore = new Statestore(metrics.get());
+  MetricGroup* metrics = perm_objects->Add(new MetricGroup("statestore"));
+  Statestore* statestore = perm_objects->Add(new Statestore(metrics));
   // Thrift will internally pick an ephemeral port if we pass in 0 as the port.
   int statestore_port = 0;
-  IGNORE_LEAKING_OBJECT(statestore);
   ASSERT_OK(statestore->Init(statestore_port));
 
-  scoped_ptr<MetricGroup> metrics_2(new MetricGroup("statestore_2"));
+  MetricGroup* metrics_2 = perm_objects->Add(new MetricGroup("statestore_2"));
   // Port already in use
-  Statestore* statestore_wont_start = new Statestore(metrics_2.get());
+  Statestore* statestore_wont_start = perm_objects->Add(new 
Statestore(metrics_2));
   ASSERT_FALSE(statestore_wont_start->Init(statestore->port()).ok());
 
-  StatestoreSubscriber* sub_will_start =
+  StatestoreSubscriber* sub_will_start = perm_objects->Add(
       new StatestoreSubscriber("sub1", MakeNetworkAddress("localhost", 0),
-          MakeNetworkAddress("localhost", statestore->port()), new 
MetricGroup(""));
-  IGNORE_LEAKING_OBJECT(sub_will_start);
+          MakeNetworkAddress("localhost", statestore->port()), new 
MetricGroup("")));
   ASSERT_OK(sub_will_start->Start());
 
   // Confirm that a subscriber trying to use an in-use port will fail to start.
-  StatestoreSubscriber* sub_will_not_start = new StatestoreSubscriber("sub3",
-      MakeNetworkAddress("localhost", sub_will_start->heartbeat_port()),
-      MakeNetworkAddress("localhost", statestore->port()), new 
MetricGroup(""));
-  IGNORE_LEAKING_OBJECT(sub_will_not_start);
+  StatestoreSubscriber* sub_will_not_start = perm_objects->Add(new 
StatestoreSubscriber(
+      "sub3", MakeNetworkAddress("localhost", 
sub_will_start->heartbeat_port()),
+      MakeNetworkAddress("localhost", statestore->port()), new 
MetricGroup("")));
   ASSERT_FALSE(sub_will_not_start->Start().ok());
+
+  statestore->ShutdownForTesting();
 }
 
-TEST(StatestoreSslTest, SmokeTest) {
+// Runs an SSL smoke test with provided parameters.
+void SslSmokeTestHelper(const string& server_ca_certificate,
+    const string& client_ca_certificate, bool sub_should_start) {
   string impala_home(getenv("IMPALA_HOME"));
   stringstream server_cert;
   server_cert << impala_home << "/be/src/testutil/server-cert.pem";
-  FLAGS_ssl_server_certificate = server_cert.str();
-  FLAGS_ssl_client_ca_certificate = server_cert.str();
+  // Override flags for the duration of this test. Modifying them while the 
statestore
+  // is running is unsafe.
+  FLAGS_ssl_server_certificate = server_ca_certificate;
+  FLAGS_ssl_client_ca_certificate = client_ca_certificate;
   stringstream server_key;
   server_key << impala_home << "/be/src/testutil/server-key.pem";
   FLAGS_ssl_private_key = server_key.str();
 
   // Thrift will internally pick an ephemeral port if we pass in 0 as the port.
   int statestore_port = 0;
-  scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
-  Statestore* statestore = new Statestore(metrics.get());
-  IGNORE_LEAKING_OBJECT(statestore);
+  MetricGroup* metrics = perm_objects->Add(new MetricGroup("statestore"));
+  Statestore* statestore = perm_objects->Add(new Statestore(metrics));
   ASSERT_OK(statestore->Init(statestore_port));
 
-  StatestoreSubscriber* sub_will_start = new StatestoreSubscriber("smoke_sub1",
-      MakeNetworkAddress("localhost", 0),
-      MakeNetworkAddress("localhost", statestore->port()), new 
MetricGroup(""));
-  IGNORE_LEAKING_OBJECT(sub_will_start);
-  ASSERT_OK(sub_will_start->Start());
+  StatestoreSubscriber* sub = perm_objects->Add(
+      new StatestoreSubscriber("smoke_sub", MakeNetworkAddress("localhost", 0),
+          MakeNetworkAddress("localhost", statestore->port()), new 
MetricGroup("")));
+  Status sub_status = sub->Start();
+  ASSERT_EQ(sub_should_start, sub_status.ok());
 
-  stringstream invalid_server_cert;
-  invalid_server_cert << impala_home << 
"/be/src/testutil/invalid-server-cert.pem";
-  FLAGS_ssl_client_ca_certificate = invalid_server_cert.str();
+  statestore->ShutdownForTesting();
+}
 
-  StatestoreSubscriber* sub_will_not_start = new 
StatestoreSubscriber("smoke_sub2",
-      MakeNetworkAddress("localhost", 0),
-      MakeNetworkAddress("localhost", statestore->port()), new 
MetricGroup(""));
-  IGNORE_LEAKING_OBJECT(sub_will_not_start);
-  ASSERT_FALSE(sub_will_not_start->Start().ok());
+string GetValidServerCert() {
+  string impala_home(getenv("IMPALA_HOME"));
+  stringstream server_cert;
+  server_cert << impala_home << "/be/src/testutil/server-cert.pem";
+  return server_cert.str();
+}
+
+TEST(StatestoreSslTest, ValidCertSmokeTest) {
+  string valid_cert = GetValidServerCert();
+  SslSmokeTestHelper(valid_cert, valid_cert, true);
 }
 
+TEST(StatestoreSslTest, InvalidCertSmokeTest) {
+  string impala_home(getenv("IMPALA_HOME"));
+  stringstream invalid_server_cert;
+  invalid_server_cert << impala_home << 
"/be/src/testutil/invalid-server-cert.pem";
+  SslSmokeTestHelper(GetValidServerCert(), invalid_server_cert.str(), false);
 }
 
+} // namespace impala
+
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
-  int rc = RUN_ALL_TESTS();
-  // IMPALA-5291: statestore services and subscribers may still be running at 
this point
-  // and accessing global state. Exit without running global destructors to 
avoid
-  // races with other threads when tearing down the proces.
-  _exit(rc);
+  perm_objects = new ObjectPool;
+  IGNORE_LEAKING_OBJECT(perm_objects);
+  return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e5d07579/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 4e63dad..70183d7 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -36,6 +36,7 @@
 #include "util/debug-util.h"
 #include "util/logging-support.h"
 #include "util/openssl-util.h"
+#include "util/test-info.h"
 #include "util/time.h"
 #include "util/uid-util.h"
 #include "util/webserver.h"
@@ -1050,3 +1051,13 @@ void Statestore::MainLoop() {
   subscriber_priority_topic_update_threadpool_.Join();
   subscriber_heartbeat_threadpool_.Join();
 }
+
+void Statestore::ShutdownForTesting() {
+  CHECK(TestInfo::is_be_test()) << "Only valid to call in backend tests.";
+  subscriber_topic_update_threadpool_.Shutdown();
+  subscriber_priority_topic_update_threadpool_.Shutdown();
+  subscriber_heartbeat_threadpool_.Shutdown();
+  subscriber_topic_update_threadpool_.Join();
+  subscriber_priority_topic_update_threadpool_.Join();
+  subscriber_heartbeat_threadpool_.Join();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/e5d07579/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 52f7d68..871494c 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -157,6 +157,11 @@ class Statestore : public CacheLineAligned {
   /// The main processing loop. Runs infinitely.
   void MainLoop();
 
+  /// Shut down some background threads. Only used for testing. Note that this 
is not
+  /// a clean shutdown because we can't correctly tear down 'thrift_server_', 
so
+  /// not all background threads are stopped and this object cannot be 
destroyed.
+  void ShutdownForTesting();
+
   /// Returns the Thrift API interface that proxies requests onto the local 
Statestore.
   const boost::shared_ptr<StatestoreServiceIf>& thrift_iface() const {
     return thrift_iface_;
@@ -713,6 +718,6 @@ class Statestore : public CacheLineAligned {
   [[noreturn]] void MonitorSubscriberHeartbeat();
 };
 
-}
+} // namespace impala
 
 #endif

Reply via email to