IMPALA-4784: Remove InProcessStatestore InProcessStatestore was only used by statestore-test, expr-test and session-expiry-test. With a slight refactor of the Statestore class, InProcessStatestore becomes obsolete.
This patch moves the ownership of the ThriftServer into the Statestore class and Statestore::Init() now takes a 'port' parameter instead of using the FLAGS_state_store_port directly. We also remove the InProcessStatestore completely. A follow on patch will get rid of the InProcessImpalaServer too (IMPALA-6013) Testing: Ran 'core' tests. Change-Id: I2621873e593b36c9612a6402ac6c5d8e3b49cde9 Reviewed-on: http://gerrit.cloudera.org:8080/10843 Reviewed-by: Sailesh Mukil <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0a470168 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0a470168 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0a470168 Branch: refs/heads/master Commit: 0a470168138b5f3254d7604a120eb2376a91c20c Parents: f3b1c4b Author: Sailesh Mukil <[email protected]> Authored: Tue Jun 26 10:15:26 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Jul 3 08:21:48 2018 +0000 ---------------------------------------------------------------------- be/src/exprs/expr-test.cc | 23 +++++++++++----- be/src/service/session-expiry-test.cc | 11 +++++--- be/src/statestore/statestore-test.cc | 43 +++++++++++++++++++----------- be/src/statestore/statestore.cc | 31 ++++++++++++++++++++- be/src/statestore/statestore.h | 14 ++++++++-- be/src/statestore/statestored-main.cc | 24 +---------------- be/src/testutil/in-process-servers.cc | 36 ------------------------- be/src/testutil/in-process-servers.h | 36 ------------------------- 8 files changed, 94 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/exprs/expr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc index 999f41a..e03f458 100644 --- a/be/src/exprs/expr-test.cc +++ b/be/src/exprs/expr-test.cc @@ -28,7 +28,6 @@ #include <boost/scoped_ptr.hpp> #include <boost/unordered_map.hpp> -#include "testutil/gtest-util.h" #include "codegen/llvm-codegen.h" #include "common/init.h" #include "common/object-pool.h" @@ -36,33 +35,36 @@ #include "exprs/like-predicate.h" #include "exprs/literal.h" #include "exprs/null-literal.h" -#include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" +#include "exprs/scalar-expr.h" #include "exprs/string-functions.h" #include "exprs/timestamp-functions.h" #include "exprs/timezone_db.h" #include "gen-cpp/Exprs_types.h" +#include "gen-cpp/ImpalaInternalService_types.h" #include "gen-cpp/hive_metastore_types.h" #include "rpc/thrift-client.h" #include "rpc/thrift-server.h" -#include "runtime/runtime-state.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.inline.h" +#include "runtime/runtime-state.h" #include "runtime/string-value.h" #include "runtime/timestamp-parse-util.h" #include "runtime/timestamp-value.h" #include "runtime/timestamp-value.inline.h" #include "service/fe-support.h" #include "service/impala-server.h" +#include "statestore/statestore.h" +#include "testutil/gtest-util.h" #include "testutil/impalad-query-executor.h" #include "testutil/in-process-servers.h" #include "udf/udf-test-harness.h" +#include "util/asan.h" #include "util/debug-util.h" #include "util/string-parser.h" #include "util/string-util.h" #include "util/test-info.h" -#include "gen-cpp/ImpalaInternalService_types.h" #include "common/names.h" @@ -83,6 +85,8 @@ using namespace impala; namespace impala { ImpaladQueryExecutor* executor_; +scoped_ptr<MetricGroup> statestore_metrics(new MetricGroup("statestore_metrics")); +Statestore* statestore; bool disable_codegen_; bool enable_expr_rewrites_; @@ -8798,11 +8802,16 @@ int main(int argc, char** argv) { FLAGS_abort_on_config_error = false; VLOG_CONNECTION << "creating test env"; VLOG_CONNECTION << "starting backends"; - InProcessStatestore* ips; - ABORT_IF_ERROR(InProcessStatestore::StartWithEphemeralPorts(&ips)); + statestore = new Statestore(statestore_metrics.get()); + IGNORE_LEAKING_OBJECT(statestore); + + // Pass in 0 to have the statestore use an ephemeral port for the service. + ABORT_IF_ERROR(statestore->Init(0)); InProcessImpalaServer* impala_server; ABORT_IF_ERROR(InProcessImpalaServer::StartWithEphemeralPorts( - FLAGS_hostname, ips->port(), &impala_server)); + FLAGS_hostname, statestore->port(), &impala_server)); + IGNORE_LEAKING_OBJECT(impala_server); + executor_ = new ImpaladQueryExecutor(FLAGS_hostname, impala_server->GetBeeswaxPort()); ABORT_IF_ERROR(executor_->Setup()); http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/service/session-expiry-test.cc ---------------------------------------------------------------------- diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc index b227c1f..89ae842 100644 --- a/be/src/service/session-expiry-test.cc +++ b/be/src/service/session-expiry-test.cc @@ -21,8 +21,10 @@ #include "rpc/thrift-client.h" #include "service/fe-support.h" #include "service/impala-server.h" +#include "statestore/statestore.h" #include "testutil/gtest-util.h" #include "testutil/in-process-servers.h" +#include "util/asan.h" #include "util/impalad-metrics.h" #include "util/time.h" @@ -48,11 +50,14 @@ TEST(SessionTest, TestExpiry) { FLAGS_idle_session_timeout = 1; // Skip validation checks for in-process backend. FLAGS_abort_on_config_error = false; - InProcessStatestore* ips; - ASSERT_OK(InProcessStatestore::StartWithEphemeralPorts(&ips)); + scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore")); + Statestore* statestore = new Statestore(metrics.get()); + IGNORE_LEAKING_OBJECT(statestore); + // Pass in 0 to have the statestore use an ephemeral port for the service. + ABORT_IF_ERROR(statestore->Init(0)); InProcessImpalaServer* impala; ASSERT_OK(InProcessImpalaServer::StartWithEphemeralPorts( - "localhost", ips->port(), &impala)); + "localhost", statestore->port(), &impala)); IntCounter* expired_metric = impala->metrics()->FindMetricForTesting<IntCounter>( ImpaladMetricKeys::NUM_SESSIONS_EXPIRED); http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestore-test.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore-test.cc b/be/src/statestore/statestore-test.cc index b481a63..a9ee095 100644 --- a/be/src/statestore/statestore-test.cc +++ b/be/src/statestore/statestore-test.cc @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "testutil/gtest-util.h" -#include "testutil/in-process-servers.h" #include "common/init.h" -#include "util/metrics.h" #include "statestore/statestore-subscriber.h" +#include "testutil/gtest-util.h" +#include "util/asan.h" +#include "util/metrics.h" #include "common/names.h" @@ -37,23 +37,29 @@ namespace impala { TEST(StatestoreTest, SmokeTest) { // All allocations done by 'new' to avoid problems shutting down Thrift servers // gracefully. - InProcessStatestore* ips; - ASSERT_OK(InProcessStatestore::StartWithEphemeralPorts(&ips)); - ASSERT_TRUE(ips != NULL) << "Could not start Statestore"; + scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore")); + Statestore* statestore = new Statestore(metrics.get()); + // Thrift will internally pick an ephemeral port if we pass in 0 as the port. + int statestore_port = 0; + IGNORE_LEAKING_OBJECT(statestore); + ASSERT_OK(statestore->Init(statestore_port)); + + scoped_ptr<MetricGroup> metrics_2(new MetricGroup("statestore_2")); // Port already in use - InProcessStatestore* statestore_wont_start = - new InProcessStatestore(ips->port(), ips->port() + 10); - ASSERT_FALSE(statestore_wont_start->Start().ok()); + Statestore* statestore_wont_start = new Statestore(metrics_2.get()); + ASSERT_FALSE(statestore_wont_start->Init(statestore->port()).ok()); - StatestoreSubscriber* sub_will_start = new StatestoreSubscriber("sub1", - MakeNetworkAddress("localhost", 0), - MakeNetworkAddress("localhost", ips->port()), new MetricGroup("")); + StatestoreSubscriber* sub_will_start = + new StatestoreSubscriber("sub1", MakeNetworkAddress("localhost", 0), + MakeNetworkAddress("localhost", statestore->port()), new MetricGroup("")); + IGNORE_LEAKING_OBJECT(sub_will_start); ASSERT_OK(sub_will_start->Start()); // Confirm that a subscriber trying to use an in-use port will fail to start. StatestoreSubscriber* sub_will_not_start = new StatestoreSubscriber("sub3", MakeNetworkAddress("localhost", sub_will_start->heartbeat_port()), - MakeNetworkAddress("localhost", ips->port()), new MetricGroup("")); + MakeNetworkAddress("localhost", statestore->port()), new MetricGroup("")); + IGNORE_LEAKING_OBJECT(sub_will_not_start); ASSERT_FALSE(sub_will_not_start->Start().ok()); } @@ -67,13 +73,17 @@ TEST(StatestoreSslTest, SmokeTest) { server_key << impala_home << "/be/src/testutil/server-key.pem"; FLAGS_ssl_private_key = server_key.str(); - InProcessStatestore* statestore; - ASSERT_OK(InProcessStatestore::StartWithEphemeralPorts(&statestore)); - if (statestore == NULL) FAIL() << "Unable to start Statestore"; + // Thrift will internally pick an ephemeral port if we pass in 0 as the port. + int statestore_port = 0; + scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore")); + Statestore* statestore = new Statestore(metrics.get()); + IGNORE_LEAKING_OBJECT(statestore); + ASSERT_OK(statestore->Init(statestore_port)); StatestoreSubscriber* sub_will_start = new StatestoreSubscriber("smoke_sub1", MakeNetworkAddress("localhost", 0), MakeNetworkAddress("localhost", statestore->port()), new MetricGroup("")); + IGNORE_LEAKING_OBJECT(sub_will_start); ASSERT_OK(sub_will_start->Start()); stringstream invalid_server_cert; @@ -83,6 +93,7 @@ TEST(StatestoreSslTest, SmokeTest) { StatestoreSubscriber* sub_will_not_start = new StatestoreSubscriber("smoke_sub2", MakeNetworkAddress("localhost", 0), MakeNetworkAddress("localhost", statestore->port()), new MetricGroup("")); + IGNORE_LEAKING_OBJECT(sub_will_not_start); ASSERT_FALSE(sub_will_not_start->Start().ok()); } http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestore.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index a58aec1..a208d97 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -28,6 +28,7 @@ #include "common/status.h" #include "gen-cpp/StatestoreService_types.h" +#include "rpc/rpc-trace.h" #include "rpc/thrift-util.h" #include "statestore/failure-detector.h" #include "statestore/statestore-subscriber-client-wrapper.h" @@ -98,6 +99,12 @@ DEFINE_int32(statestore_update_tcp_timeout_seconds, 300, "(Advanced) The time af "badly hung machines that are not able to respond to the update RPC in short " "order."); +DECLARE_string(ssl_server_certificate); +DECLARE_string(ssl_private_key); +DECLARE_string(ssl_private_key_password_cmd); +DECLARE_string(ssl_cipher_list); +DECLARE_string(ssl_minimum_version); + // Metric keys // TODO: Replace 'backend' with 'subscriber' when we can coordinate a change with CM const string STATESTORE_LIVE_SUBSCRIBERS = "statestore.live-backends"; @@ -408,6 +415,7 @@ Statestore::Statestore(MetricGroup* metrics) FLAGS_statestore_max_missed_heartbeats / 2)) { DCHECK(metrics != NULL); + metrics_ = metrics; num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0); subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics, STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>()); @@ -426,10 +434,31 @@ Statestore::Statestore(MetricGroup* metrics) heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat"); } -Status Statestore::Init() { +Status Statestore::Init(int32_t state_store_port) { + boost::shared_ptr<TProcessor> processor(new StatestoreServiceProcessor(thrift_iface())); + boost::shared_ptr<TProcessorEventHandler> event_handler( + new RpcEventHandler("statestore", metrics_)); + processor->setEventHandler(event_handler); + ThriftServerBuilder builder("StatestoreService", processor, state_store_port); + if (IsInternalTlsConfigured()) { + SSLProtocol ssl_version; + RETURN_IF_ERROR( + SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version)); + LOG(INFO) << "Enabling SSL for Statestore"; + builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key) + .pem_password_cmd(FLAGS_ssl_private_key_password_cmd) + .ssl_version(ssl_version) + .cipher_list(FLAGS_ssl_cipher_list); + } + ThriftServer* server; + RETURN_IF_ERROR(builder.metrics(metrics_).Build(&server)); + thrift_server_.reset(server); + RETURN_IF_ERROR(thrift_server_->Start()); + RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init()); RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init()); RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init()); + return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestore.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index 71e1ade..1d7f1a2 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -123,9 +123,10 @@ class Statestore : public CacheLineAligned { /// The only constructor; initialises member variables only. Statestore(MetricGroup* metrics); + /// Initialize and start the backing ThriftServer with port 'state_store_port'. /// Initialize the ThreadPools used for updates and heartbeats. Returns an error if - /// ThreadPool initialization fails. - Status Init() WARN_UNUSED_RESULT; + /// any of the above initialization fails. + Status Init(int32_t state_store_port) WARN_UNUSED_RESULT; /// Registers a new subscriber with the given unique subscriber ID, running a subscriber /// service at the given location, with the provided list of topic subscriptions. @@ -158,6 +159,9 @@ class Statestore : public CacheLineAligned { static const std::string IMPALA_MEMBERSHIP_TOPIC; /// Topic tracking the state of admission control on all coordinators. static const std::string IMPALA_REQUEST_QUEUE_TOPIC; + + int32_t port() { return thrift_server_->port(); } + private: /// A TopicEntry is a single entry in a topic, and logically is a <string, byte string> /// pair. @@ -526,6 +530,12 @@ class Statestore : public CacheLineAligned { /// of time. boost::scoped_ptr<StatestoreSubscriberClientCache> heartbeat_client_cache_; + /// Container for the internal statestore service. + boost::scoped_ptr<ThriftServer> thrift_server_; + + /// Pointer to the MetricGroup for this statestore. Not owned. + MetricGroup* metrics_; + /// Thrift API implementation which proxies requests onto this Statestore boost::shared_ptr<StatestoreServiceIf> thrift_iface_; http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestored-main.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc index 633d449..d7db794 100644 --- a/be/src/statestore/statestored-main.cc +++ b/be/src/statestore/statestored-main.cc @@ -31,7 +31,6 @@ #include "util/common-metrics.h" #include "util/debug-util.h" #include "util/metrics.h" -#include "util/openssl-util.h" #include "util/memory-metrics.h" #include "util/webserver.h" #include "util/default-path-handlers.h" @@ -39,12 +38,6 @@ DECLARE_int32(state_store_port); DECLARE_int32(webserver_port); DECLARE_bool(enable_webserver); -DECLARE_string(principal); -DECLARE_string(ssl_server_certificate); -DECLARE_string(ssl_private_key); -DECLARE_string(ssl_private_key_password_cmd); -DECLARE_string(ssl_cipher_list); -DECLARE_string(ssl_minimum_version); #include "common/names.h" @@ -82,7 +75,7 @@ int StatestoredMain(int argc, char** argv) { CommonMetrics::InitCommonMetrics(metrics.get()); Statestore statestore(metrics.get()); - ABORT_IF_ERROR(statestore.Init()); + ABORT_IF_ERROR(statestore.Init(FLAGS_state_store_port)); statestore.RegisterWebpages(webserver.get()); boost::shared_ptr<TProcessor> processor( new StatestoreServiceProcessor(statestore.thrift_iface())); @@ -90,21 +83,6 @@ int StatestoredMain(int argc, char** argv) { new RpcEventHandler("statestore", metrics.get())); processor->setEventHandler(event_handler); - ThriftServer* server; - ThriftServerBuilder builder("StatestoreService", processor, FLAGS_state_store_port); - if (IsInternalTlsConfigured()) { - SSLProtocol ssl_version; - ABORT_IF_ERROR( - SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version)); - LOG(INFO) << "Enabling SSL for Statestore"; - builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key) - .pem_password_cmd(FLAGS_ssl_private_key_password_cmd) - .ssl_version(ssl_version) - .cipher_list(FLAGS_ssl_cipher_list); - } - ABORT_IF_ERROR(builder.metrics(metrics.get()).Build(&server)); - ABORT_IF_ERROR(server->Start()); - statestore.MainLoop(); return 0; http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/testutil/in-process-servers.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc index 031a07e..8a786e0 100644 --- a/be/src/testutil/in-process-servers.cc +++ b/be/src/testutil/in-process-servers.cc @@ -105,39 +105,3 @@ int InProcessImpalaServer::GetBeeswaxPort() const { int InProcessImpalaServer::GetHS2Port() const { return impala_server_->GetHS2Port(); } - -Status InProcessStatestore::StartWithEphemeralPorts(InProcessStatestore** statestore) { - *statestore = new InProcessStatestore(0, 0); - return (*statestore)->Start(); -} - -InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port) - : webserver_(new Webserver(webserver_port)), - metrics_(new MetricGroup("statestore")), - statestore_port_(statestore_port), - statestore_(new Statestore(metrics_.get())) { - AddDefaultUrlCallbacks(webserver_.get()); - statestore_->RegisterWebpages(webserver_.get()); -} - -Status InProcessStatestore::Start() { - RETURN_IF_ERROR(statestore_->Init()); - RETURN_IF_ERROR(webserver_->Start()); - boost::shared_ptr<TProcessor> processor( - new StatestoreServiceProcessor(statestore_->thrift_iface())); - - ThriftServerBuilder builder("StatestoreService", processor, statestore_port_); - if (IsInternalTlsConfigured()) { - LOG(INFO) << "Enabling SSL for Statestore"; - builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key); - } - ThriftServer* server; - ABORT_IF_ERROR(builder.metrics(metrics_.get()).Build(&server)); - statestore_server_.reset(server); - RETURN_IF_ERROR(Thread::Create("statestore", "main-loop", - &Statestore::MainLoop, statestore_.get(), &statestore_main_loop_)); - - RETURN_IF_ERROR(statestore_server_->Start()); - statestore_port_ = statestore_server_->port(); - return WaitForServer("localhost", statestore_port_, 10, 100); -} http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/testutil/in-process-servers.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h index 6ac9734..f863650 100644 --- a/be/src/testutil/in-process-servers.h +++ b/be/src/testutil/in-process-servers.h @@ -93,42 +93,6 @@ class InProcessImpalaServer { boost::scoped_ptr<ExecEnv> exec_env_; }; -/// An in-process statestore, with webserver and metrics. -class InProcessStatestore { - public: - - // Creates and starts an InProcessStatestore with ports chosen from the ephemeral port - // range. Returns OK and sets *statestore on success. On failure, an error is - /// returned and *statestore may or may not be set but is always invalid to use. - static Status StartWithEphemeralPorts(InProcessStatestore** statestore); - - /// Constructs but does not start the statestore. - InProcessStatestore(int statestore_port, int webserver_port); - - /// Starts the statestore server, and the processing thread. - Status Start(); - - uint32_t port() { return statestore_port_; } - - private: - /// Websever object to serve debug pages through. - boost::scoped_ptr<Webserver> webserver_; - - /// MetricGroup object - boost::scoped_ptr<MetricGroup> metrics_; - - /// Port to start the statestore on. - uint32_t statestore_port_; - - /// The statestore instance - boost::scoped_ptr<Statestore> statestore_; - - /// Statestore Thrift server - boost::scoped_ptr<ThriftServer> statestore_server_; - - std::unique_ptr<Thread> statestore_main_loop_; -}; - } #endif
