http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index 026a06e..a170ea1 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -73,7 +73,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) { // set of in-flight queries. Status status = SetQueryInflight(session, request_state); if (!status.ok()) { - (void) UnregisterQuery(request_state->query_id(), false, &status); + discard_result(UnregisterQuery(request_state->query_id(), false, &status)); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } TUniqueIdToQueryHandle(request_state->query_id(), &query_handle); @@ -111,7 +111,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, // set of in-flight queries. Status status = SetQueryInflight(session, request_state); if (!status.ok()) { - (void) UnregisterQuery(request_state->query_id(), false, &status); + discard_result(UnregisterQuery(request_state->query_id(), false, &status)); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } // block until results are ready @@ -121,7 +121,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, status = request_state->query_status(); } if (!status.ok()) { - (void) UnregisterQuery(request_state->query_id(), false, &status); + discard_result(UnregisterQuery(request_state->query_id(), false, &status)); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } @@ -171,7 +171,7 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle VLOG_ROW << "fetch result: #results=" << query_results.data.size() << " has_more=" << (query_results.has_more ? "true" : "false"); if (!status.ok()) { - (void) UnregisterQuery(query_id, false, &status); + discard_result(UnregisterQuery(query_id, false, &status)); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 2528349..6a1b5f4 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -154,7 +154,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, Status exec_status = request_state->Exec(*request); if (!exec_status.ok()) { - (void) UnregisterQuery(request_state->query_id(), false, &exec_status); + discard_result(UnregisterQuery(request_state->query_id(), false, &exec_status)); status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); status->__set_errorMessage(exec_status.GetDetail()); status->__set_sqlState(SQLSTATE_GENERAL_ERROR); @@ -165,7 +165,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, Status inflight_status = SetQueryInflight(session, request_state); if (!inflight_status.ok()) { - (void) UnregisterQuery(request_state->query_id(), false, &inflight_status); + discard_result(UnregisterQuery(request_state->query_id(), false, &inflight_status)); status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); status->__set_errorMessage(inflight_status.GetDetail()); status->__set_sqlState(SQLSTATE_GENERAL_ERROR); @@ -340,8 +340,8 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val, } else { // Normal configuration key. Use it to set session default query options. // Ignore failure (failures will be logged in SetQueryOption()). - SetQueryOption(v.first, v.second, &state->default_query_options, - &state->set_query_options_mask); + discard_result(SetQueryOption(v.first, v.second, &state->default_query_options, + &state->set_query_options_mask)); } } } @@ -465,7 +465,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, session->hs2_version, *request_state->result_metadata(), nullptr), cache_num_rows); if (!status.ok()) { - (void) UnregisterQuery(request_state->query_id(), false, &status); + discard_result(UnregisterQuery(request_state->query_id(), false, &status)); HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); } } @@ -476,7 +476,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, // set of in-flight queries. status = SetQueryInflight(session, request_state); if (!status.ok()) { - (void) UnregisterQuery(request_state->query_id(), false, &status); + discard_result(UnregisterQuery(request_state->query_id(), false, &status)); HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); } return_val.__isset.operationHandle = true; @@ -795,7 +795,7 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val, if (status.IsRecoverableError()) { DCHECK(fetch_first); } else { - (void) UnregisterQuery(query_id, false, &status); + discard_result(UnregisterQuery(query_id, false, &status)); } HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 5a79d9d..79903b4 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -547,11 +547,12 @@ void ImpalaHttpHandler::CatalogObjectsHandler(const Webserver::ArgumentMap& args // Get the object type and name from the topic entry key TCatalogObject request; - TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request); - - // Get the object and dump its contents. TCatalogObject result; - Status status = server_->exec_env_->frontend()->GetCatalogObject(request, &result); + Status status = TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request); + if (status.ok()) { + // Get the object and dump its contents. + status = server_->exec_env_->frontend()->GetCatalogObject(request, &result); + } if (status.ok()) { Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator()); document->AddMember("thrift_string", debug_string, document->GetAllocator()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 00a9d9a..e173c84 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -73,6 +73,7 @@ #include "util/runtime-profile.h" #include "util/string-parser.h" #include "util/summary-util.h" +#include "util/test-info.h" #include "util/uid-util.h" #include "gen-cpp/Types_types.h" @@ -357,21 +358,22 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env->metrics())); - // Register the membership callback if required - if (exec_env->subscriber() != nullptr) { + // Register the membership callback if running in a real cluster. + if (!TestInfo::is_test()) { auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state, vector<TTopicDelta>* topic_updates) { this->MembershipCallback(state, topic_updates); }; - exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb); + ABORT_IF_ERROR( + exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb)); if (FLAGS_is_coordinator) { auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state, vector<TTopicDelta>* topic_updates) { this->CatalogUpdateCallback(state, topic_updates); }; - exec_env->subscriber()->AddTopic(CatalogServer::IMPALA_CATALOG_TOPIC, true, - catalog_cb); + ABORT_IF_ERROR(exec_env->subscriber()->AddTopic( + CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb)); } } @@ -581,11 +583,11 @@ void ImpalaServer::LogQueryEvents(const ClientRequestState& request_state) { if (IsAuditEventLoggingEnabled() && (Frontend::IsAuthorizationError(request_state.query_status()) || log_events)) { // TODO: deal with an error status - (void) LogAuditRecord(request_state, request_state.exec_request()); + discard_result(LogAuditRecord(request_state, request_state.exec_request())); } if (IsLineageLoggingEnabled() && log_events) { // TODO: deal with an error status - (void) LogLineageRecord(request_state); + discard_result(LogLineageRecord(request_state)); } } @@ -622,7 +624,7 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id, RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(), request_state->user_has_profile_access())); if (base64_encoded) { - request_state->profile().SerializeToArchiveString(output); + RETURN_IF_ERROR(request_state->profile().SerializeToArchiveString(output)); } else { request_state->profile().PrettyPrint(output); } @@ -703,7 +705,10 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use [[noreturn]] void ImpalaServer::LogFileFlushThread() { while (true) { sleep(5); - profile_logger_->Flush(); + const Status status = profile_logger_->Flush(); + if (!status.ok()) { + LOG(WARNING) << "Error flushing profile log: " << status.GetDetail(); + } } } @@ -736,14 +741,21 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use } void ImpalaServer::ArchiveQuery(const ClientRequestState& query) { - const string& encoded_profile_str = query.profile().SerializeToArchiveString(); + string encoded_profile_str; + Status status = query.profile().SerializeToArchiveString(&encoded_profile_str); + if (!status.ok()) { + // Didn't serialize the string. Continue with empty string. + LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string " + << status.GetDetail(); + return; + } // If there was an error initialising archival (e.g. directory is not writeable), // FLAGS_log_query_to_file will have been set to false if (FLAGS_log_query_to_file) { stringstream ss; ss << UnixMillis() << " " << query.query_id() << " " << encoded_profile_str; - Status status = profile_logger_->AppendEntry(ss.str()); + status = profile_logger_->AppendEntry(ss.str()); if (!status.ok()) { LOG_EVERY_N(WARNING, 1000) << "Could not write to profile log file file (" << google::COUNTER << " attempts failed): " @@ -826,7 +838,7 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, Status status = ExecuteInternal(*query_ctx, session_state, ®istered_request_state, request_state); if (!status.ok() && registered_request_state) { - (void) UnregisterQuery((*request_state)->query_id(), false, &status); + discard_result(UnregisterQuery((*request_state)->query_id(), false, &status)); } return status; } @@ -1105,7 +1117,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id, Status status("Session closed"); for (const TUniqueId& query_id: inflight_queries) { // TODO: deal with an error status - (void) UnregisterQuery(query_id, false, &status); + discard_result(UnregisterQuery(query_id, false, &status)); } // Reconfigure the poll period of session_timeout_thread_ if necessary. int32_t session_timeout = session_state->session_timeout; @@ -1427,7 +1439,7 @@ void ImpalaServer::CatalogUpdateCallback( } ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0); // TODO: deal with an error status - (void) UpdateCatalogMetrics(); + discard_result(UpdateCatalogMetrics()); // Remove all dropped objects from the library cache. // TODO: is this expensive? We'd like to process heartbeats promptly. for (TCatalogObject& object: dropped_objects) { @@ -1690,7 +1702,12 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque request_state.profile().PrettyPrint(&ss); profile_str = ss.str(); if (encoded_profile.empty()) { - encoded_profile_str = request_state.profile().SerializeToArchiveString(); + Status status = + request_state.profile().SerializeToArchiveString(&encoded_profile_str); + if (!status.ok()) { + LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string " + << status.GetDetail(); + } } else { encoded_profile_str = encoded_profile; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impalad-main.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc index 32fbcc8..8a7961c 100644 --- a/be/src/service/impalad-main.cc +++ b/be/src/service/impalad-main.cc @@ -77,7 +77,8 @@ int ImpaladMain(int argc, char** argv) { // start backend service for the coordinator on be_port ExecEnv exec_env; - StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver(), true); + ABORT_IF_ERROR( + StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver(), true)); InitRpcEventTracing(exec_env.webserver()); CommonMetrics::InitCommonMetrics(exec_env.metrics()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/query-options-test.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index 9006947..397df5a 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -74,7 +74,8 @@ TEST(QueryOptions, SetFilterWait) { EXPECT_FALSE(SetQueryOption("RUNTIME_FILTER_WAIT_TIME_MS", "-1", &options, NULL).ok()); EXPECT_FALSE(SetQueryOption("RUNTIME_FILTER_WAIT_TIME_MS", - lexical_cast<string>(numeric_limits<int32_t>::max() + 1), &options, NULL).ok()); + lexical_cast<string>(static_cast<int64_t>(numeric_limits<int32_t>::max()) + 1), + &options, NULL).ok()); EXPECT_OK(SetQueryOption("RUNTIME_FILTER_WAIT_TIME_MS", "0", &options, NULL)); EXPECT_EQ(0, options.runtime_filter_wait_time_ms); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/statestore/statestore.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index 9f6097c..5d7738c 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -720,8 +720,13 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id, // Schedule the next message. VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " deadline for: " << subscriber->id() << " is in " << deadline_ms << "ms"; - OfferUpdate(make_pair(deadline_ms, subscriber->id()), is_heartbeat ? + status = OfferUpdate(make_pair(deadline_ms, subscriber->id()), is_heartbeat ? &subscriber_heartbeat_threadpool_ : &subscriber_topic_update_threadpool_); + if (!status.ok()) { + LOG(INFO) << "Unable to send next " << (is_heartbeat ? "heartbeat" : "update") + << " message to subscriber '" << subscriber->id() << "': " + << status.GetDetail(); + } } } } @@ -755,7 +760,6 @@ void Statestore::UnregisterSubscriber(Subscriber* subscriber) { subscribers_.erase(subscriber->id()); } -Status Statestore::MainLoop() { +void Statestore::MainLoop() { subscriber_topic_update_threadpool_.Join(); - return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/statestore/statestore.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index 44d9792..b3ba315 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -108,14 +108,12 @@ class Statestore : public CacheLineAligned { Status RegisterSubscriber(const SubscriberId& subscriber_id, const TNetworkAddress& location, const std::vector<TTopicRegistration>& topic_registrations, - TUniqueId* registration_id); + TUniqueId* registration_id) WARN_UNUSED_RESULT; void RegisterWebpages(Webserver* webserver); /// The main processing loop. Blocks until the exit flag is set. - // - /// Returns OK unless there is an unrecoverable error. - Status MainLoop(); + void MainLoop(); /// Returns the Thrift API interface that proxies requests onto the local Statestore. const boost::shared_ptr<StatestoreServiceIf>& thrift_iface() const { @@ -439,10 +437,10 @@ class Statestore : public CacheLineAligned { /// Utility method to add an update to the given thread pool, and to fail if the thread /// pool is already at capacity. Status OfferUpdate(const ScheduledSubscriberUpdate& update, - ThreadPool<ScheduledSubscriberUpdate>* thread_pool); + ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT; - /// Sends either a heartbeat or topic update message to the subscriber in 'update' at the - /// closest possible time to the first member of 'update'. If is_heartbeat is true, + /// Sends either a heartbeat or topic update message to the subscriber in 'update' at + /// the closest possible time to the first member of 'update'. If is_heartbeat is true, /// sends a heartbeat update, otherwise the set of pending topic updates is sent. Once /// complete, the next update is scheduled and added to the appropriate queue. void DoSubscriberUpdate(bool is_heartbeat, int thread_id, @@ -458,14 +456,15 @@ class Statestore : public CacheLineAligned { /// will return OK (since there was no error) and the output parameter update_skipped is /// set to true. Otherwise, any updates returned by the subscriber are applied to their /// target topics. - Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped); + Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) WARN_UNUSED_RESULT; /// Sends a heartbeat message to subscriber. Returns false if there was some error /// performing the RPC. - Status SendHeartbeat(Subscriber* subscriber); + Status SendHeartbeat(Subscriber* subscriber) WARN_UNUSED_RESULT; /// Unregister a subscriber, removing all of its transient entries and evicting it from - /// the subscriber map. Callers must hold subscribers_lock_ prior to calling this method. + /// the subscriber map. Callers must hold subscribers_lock_ prior to calling this + /// method. void UnregisterSubscriber(Subscriber* subscriber); /// Populates a TUpdateStateRequest with the update state for this subscriber. Iterates http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/statestore/statestored-main.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc index 1dcb682..1f06f04 100644 --- a/be/src/statestore/statestored-main.cc +++ b/be/src/statestore/statestored-main.cc @@ -67,10 +67,12 @@ int StatestoredMain(int argc, char** argv) { LOG(INFO) << "Not starting webserver"; } - metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr); + ABORT_IF_ERROR( + metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr)); ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), false, nullptr, nullptr)); StartMemoryMaintenanceThread(); - StartThreadInstrumentation(metrics.get(), webserver.get(), false); + ABORT_IF_ERROR( + StartThreadInstrumentation(metrics.get(), webserver.get(), false)); InitRpcEventTracing(webserver.get()); // TODO: Add a 'common metrics' method to add standard metrics to // both statestored and impalad http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/death-test-util.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/death-test-util.h b/be/src/testutil/death-test-util.h index 6421fb7..474025b 100644 --- a/be/src/testutil/death-test-util.h +++ b/be/src/testutil/death-test-util.h @@ -25,10 +25,10 @@ // Wrapper around gtest's ASSERT_DEBUG_DEATH that prevents coredumps and minidumps // being generated as the result of the death test. #ifndef NDEBUG -#define IMPALA_ASSERT_DEBUG_DEATH(fn, msg) \ - do { \ - ScopedCoredumpDisabler disable_coredumps; \ - ASSERT_DEBUG_DEATH((void)fn, msg); \ +#define IMPALA_ASSERT_DEBUG_DEATH(fn, msg) \ + do { \ + ScopedCoredumpDisabler disable_coredumps; \ + ASSERT_DEBUG_DEATH((void)fn, msg); \ } while (false); #else // Gtest's ASSERT_DEBUG_DEATH macro has peculiar semantics where in debug builds it http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/fault-injection-util.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/fault-injection-util.cc b/be/src/testutil/fault-injection-util.cc index e2c32b1..f378c48 100644 --- a/be/src/testutil/fault-injection-util.cc +++ b/be/src/testutil/fault-injection-util.cc @@ -19,6 +19,8 @@ #include "testutil/fault-injection-util.h" +#include <random> + #include <thrift/transport/TSSLSocket.h> #include <thrift/transport/TTransportException.h> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/impalad-query-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/impalad-query-executor.cc b/be/src/testutil/impalad-query-executor.cc index db0aceb..111b2a6 100644 --- a/be/src/testutil/impalad-query-executor.cc +++ b/be/src/testutil/impalad-query-executor.cc @@ -44,7 +44,7 @@ ImpaladQueryExecutor::ImpaladQueryExecutor(const string& hostname, uint32_t port } ImpaladQueryExecutor::~ImpaladQueryExecutor() { - Close(); + discard_result(Close()); } Status ImpaladQueryExecutor::Setup() { @@ -71,7 +71,7 @@ Status ImpaladQueryExecutor::Close() { Status ImpaladQueryExecutor::Exec( const string& query_string, vector<FieldSchema>* col_schema) { // close anything that ran previously - Close(); + discard_result(Close()); Query query; query.query = query_string; query.configuration = exec_options_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/in-process-servers.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc index 7a81915..4fecfb3 100644 --- a/be/src/testutil/in-process-servers.cc +++ b/be/src/testutil/in-process-servers.cc @@ -68,7 +68,8 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts( // pick a new set of ports Status started = impala->StartWithClientServers(beeswax_port, hs2_port); if (started.ok()) { - impala->SetCatalogInitialized(); + const Status status = impala->SetCatalogInitialized(); + if (!status.ok()) LOG(WARNING) << status.GetDetail(); return impala; } delete impala; @@ -88,13 +89,14 @@ InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend statestore_host, statestore_port)) { } -void InProcessImpalaServer::SetCatalogInitialized() { +Status InProcessImpalaServer::SetCatalogInitialized() { DCHECK(impala_server_ != NULL) << "Call Start*() first."; - exec_env_->frontend()->SetCatalogInitialized(); + return exec_env_->frontend()->SetCatalogInitialized(); } Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int hs2_port) { RETURN_IF_ERROR(exec_env_->StartServices()); + beeswax_port_ = beeswax_port; hs2_port_ = hs2_port; ThriftServer* be_server; @@ -158,7 +160,7 @@ InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port } Status InProcessStatestore::Start() { - webserver_->Start(); + RETURN_IF_ERROR(webserver_->Start()); boost::shared_ptr<TProcessor> processor( new StatestoreServiceProcessor(statestore_->thrift_iface())); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/in-process-servers.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h index 3f91b2a..03b02f3 100644 --- a/be/src/testutil/in-process-servers.h +++ b/be/src/testutil/in-process-servers.h @@ -74,7 +74,7 @@ class InProcessImpalaServer { /// Sets the catalog on this impalad to be initialized. If we don't /// start up a catalogd, then there is no one to initialize it otherwise. - void SetCatalogInitialized(); + Status SetCatalogInitialized(); uint32_t beeswax_port() const { return beeswax_port_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/util/benchmark.cc b/be/src/util/benchmark.cc index 43570a6..6ffbf00 100644 --- a/be/src/util/benchmark.cc +++ b/be/src/util/benchmark.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include <cmath> #include <iomanip> #include <iostream> #include <sstream> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/bit-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc index 2c40e569..5f8d443 100644 --- a/be/src/util/bit-util-test.cc +++ b/be/src/util/bit-util-test.cc @@ -17,10 +17,12 @@ #include <stdlib.h> #include <stdio.h> -#include <iostream> -#include <algorithm> #include <limits.h> +#include <algorithm> +#include <iostream> +#include <numeric> + #include <boost/utility.hpp> #include "testutil/gtest-util.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/codec.h ---------------------------------------------------------------------- diff --git a/be/src/util/codec.h b/be/src/util/codec.h index 9475ec1..b150b3c 100644 --- a/be/src/util/codec.h +++ b/be/src/util/codec.h @@ -63,7 +63,8 @@ class Codec { /// If mem_pool is nullptr, then the resulting codec will never allocate memory and /// the caller must be responsible for it. static Status CreateDecompressor(MemPool* mem_pool, bool reuse, - THdfsCompression::type format, boost::scoped_ptr<Codec>* decompressor); + THdfsCompression::type format, + boost::scoped_ptr<Codec>* decompressor) WARN_UNUSED_RESULT; /// Alternate factory method: takes a codec string and populates a scoped pointer. static Status CreateDecompressor(MemPool* mem_pool, bool reuse, @@ -88,7 +89,8 @@ class Codec { /// Return the name of a compression algorithm. static std::string GetCodecName(THdfsCompression::type); /// Returns the java class name for the given compression type - static Status GetHadoopCodecClassName(THdfsCompression::type, std::string* out_name); + static Status GetHadoopCodecClassName( + THdfsCompression::type, std::string* out_name) WARN_UNUSED_RESULT; virtual ~Codec() {} @@ -109,7 +111,8 @@ class Codec { /// input_length: length of the data to process /// input: data to process virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output) = 0; + const uint8_t* input, int64_t* output_length, + uint8_t** output) WARN_UNUSED_RESULT = 0; /// Wrapper to the actual ProcessBlock() function. This wrapper uses lengths as ints and /// not int64_ts. We need to keep this interface because the Parquet thrift uses ints. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/filesystem-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h index 3e824b8..1d497c3 100644 --- a/be/src/util/filesystem-util.h +++ b/be/src/util/filesystem-util.h @@ -31,22 +31,23 @@ class FileSystemUtil { /// Create the specified directory and any ancestor directories that do not exist yet. /// The directory and its contents are destroyed if it already exists. /// Returns Status::OK if successful, or a runtime error with a message otherwise. - static Status RemoveAndCreateDirectory(const std::string& directory); + static Status RemoveAndCreateDirectory(const std::string& directory) WARN_UNUSED_RESULT; /// Create a file at the specified path. - static Status CreateFile(const std::string& file_path); + static Status CreateFile(const std::string& file_path) WARN_UNUSED_RESULT; /// Remove the specified paths and their enclosing files/directories. - static Status RemovePaths(const std::vector<std::string>& directories); + static Status RemovePaths( + const std::vector<std::string>& directories) WARN_UNUSED_RESULT; /// Verify that the specified path is an existing directory. /// Returns Status::OK if it is, or a runtime error with a message otherwise. - static Status VerifyIsDirectory(const std::string& directory_path); + static Status VerifyIsDirectory(const std::string& directory_path) WARN_UNUSED_RESULT; /// Returns the space available on the file system containing 'directory_path' /// in 'available_bytes' - static Status GetSpaceAvailable(const std::string& directory_path, - uint64_t* available_bytes); + static Status GetSpaceAvailable( + const std::string& directory_path, uint64_t* available_bytes) WARN_UNUSED_RESULT; /// Returns the currently allowed maximum of possible file descriptors. In case of an /// error returns 0. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/hdfs-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-util-test.cc b/be/src/util/hdfs-util-test.cc index 89cc43a..b389864 100644 --- a/be/src/util/hdfs-util-test.cc +++ b/be/src/util/hdfs-util-test.cc @@ -34,7 +34,8 @@ TEST(HdfsUtilTest, CheckFilesystemsMatch) { ExecEnv* exec_env = new ExecEnv(); // We do this to retrieve the default FS from the frontend. - exec_env->StartServices(); + // It doesn't matter if starting the services fails. + discard_result(exec_env->StartServices()); // Tests with both paths qualified. EXPECT_TRUE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path", http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/jni-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc index 78f67a7..220eb83 100644 --- a/be/src/util/jni-util.cc +++ b/be/src/util/jni-util.cc @@ -100,12 +100,6 @@ Status JniUtil::LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global return Status::OK(); } -Status JniUtil::FreeGlobalRef(JNIEnv* env, jobject global_ref) { - env->DeleteGlobalRef(global_ref); - RETURN_ERROR_IF_EXC(env); - return Status::OK(); -} - Status JniUtil::Init() { // Get the JNIEnv* corresponding to current thread. JNIEnv* env = getJNIEnv(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/jni-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index 9a9cb15..77abb4d 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -124,7 +124,7 @@ class JniLocalFrame { /// The number of local references created inside the frame might exceed max_local_ref, /// but there is no guarantee that memory will be available. /// Push should be called at most once. - Status push(JNIEnv* env, int max_local_ref=10); + Status push(JNIEnv* env, int max_local_ref = 10) WARN_UNUSED_RESULT; private: JNIEnv* env_; @@ -187,7 +187,7 @@ class JniUtil { static void InitLibhdfs(); /// Find JniUtil class, and get JniUtil.throwableToString method id - static Status Init(); + static Status Init() WARN_UNUSED_RESULT; /// Returns true if the given class could be found on the CLASSPATH in env. /// Returns false otherwise, or if any other error occurred (e.g. a JNI exception). @@ -204,13 +204,15 @@ class JniUtil { /// The returned reference must eventually be freed by calling FreeGlobalRef() (or have /// the lifetime of the impalad process). /// Catches Java exceptions and converts their message into status. - static Status GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref); + static Status GetGlobalClassRef( + JNIEnv* env, const char* class_str, jclass* class_ref) WARN_UNUSED_RESULT; /// Creates a global reference from a local reference returned into global_ref. /// The returned reference must eventually be freed by calling FreeGlobalRef() (or have /// the lifetime of the impalad process). /// Catches Java exceptions and converts their message into status. - static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global_ref); + static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref, + jobject* global_ref) WARN_UNUSED_RESULT; /// Templated wrapper for jobject subclasses (e.g. jclass, jarray). This is necessary /// because according to @@ -224,15 +226,11 @@ class JniUtil { /// to use a subclass like _jclass**. This is safe in this case because the returned /// subclass is known to be correct. template <typename jobject_subclass> - static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref, - jobject_subclass* global_ref) { + static Status LocalToGlobalRef( + JNIEnv* env, jobject local_ref, jobject_subclass* global_ref) { return LocalToGlobalRef(env, local_ref, reinterpret_cast<jobject*>(global_ref)); } - /// Deletes 'global_ref'. Catches Java exceptions and converts their message into - /// status. - static Status FreeGlobalRef(JNIEnv* env, jobject global_ref); - static jmethodID throwable_to_string_id() { return throwable_to_string_id_; } static jmethodID throwable_to_stack_trace_id() { return throwable_to_stack_trace_id_; } @@ -246,30 +244,31 @@ class JniUtil { /// log_stack determines if the stack trace is written to the log /// prefix, if non-empty will be prepended to the error message. static Status GetJniExceptionMsg(JNIEnv* env, bool log_stack = true, - const std::string& prefix = ""); + const std::string& prefix = "") WARN_UNUSED_RESULT; /// Populates 'result' with a list of memory metrics from the Jvm. Returns Status::OK /// unless there is an exception. static Status GetJvmMetrics(const TGetJvmMetricsRequest& request, - TGetJvmMetricsResponse* result); + TGetJvmMetricsResponse* result) WARN_UNUSED_RESULT; // Populates 'result' with information about live JVM threads. Returns // Status::OK unless there is an exception. static Status GetJvmThreadsInfo(const TGetJvmThreadsInfoRequest& request, - TGetJvmThreadsInfoResponse* result); + TGetJvmThreadsInfoResponse* result) WARN_UNUSED_RESULT; /// Loads a method whose signature is in the supplied descriptor. Returns Status::OK /// and sets descriptor->method_id to a JNI method handle if successful, otherwise an /// error status is returned. static Status LoadJniMethod(JNIEnv* jni_env, const jclass& jni_class, - JniMethodDescriptor* descriptor); + JniMethodDescriptor* descriptor) WARN_UNUSED_RESULT; /// Same as LoadJniMethod(...), except that this loads a static method. static Status LoadStaticJniMethod(JNIEnv* jni_env, const jclass& jni_class, - JniMethodDescriptor* descriptor); + JniMethodDescriptor* descriptor) WARN_UNUSED_RESULT; /// Utility methods to avoid repeating lots of the JNI call boilerplate. - static Status CallJniMethod(const jobject& obj, const jmethodID& method) { + static Status CallJniMethod( + const jobject& obj, const jmethodID& method) WARN_UNUSED_RESULT { JNIEnv* jni_env = getJNIEnv(); JniLocalFrame jni_frame; RETURN_IF_ERROR(jni_frame.push(jni_env)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/memory-metrics.h ---------------------------------------------------------------------- diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h index ffc3d20..6f306f7 100644 --- a/be/src/util/memory-metrics.h +++ b/be/src/util/memory-metrics.h @@ -155,7 +155,7 @@ class JvmMetric : public IntGauge { public: /// Registers many Jvm memory metrics: one for every member of JvmMetricType for each /// pool (usually ~5 pools plus a synthetic 'total' pool). - static Status InitMetrics(MetricGroup* metrics); + static Status InitMetrics(MetricGroup* metrics) WARN_UNUSED_RESULT; protected: /// Searches through jvm_metrics_response_ for a matching memory pool and pulls out the @@ -193,7 +193,7 @@ class JvmMetric : public IntGauge { class BufferPoolMetric : public UIntGauge { public: static Status InitMetrics(MetricGroup* metrics, ReservationTracker* global_reservations, - BufferPool* buffer_pool); + BufferPool* buffer_pool) WARN_UNUSED_RESULT; /// Global metrics, initialized by CreateAndRegisterMetrics(). static BufferPoolMetric* LIMIT; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/metrics-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc index 543e0e3..f25d5ad 100644 --- a/be/src/util/metrics-test.cc +++ b/be/src/util/metrics-test.cc @@ -217,7 +217,7 @@ TEST_F(MetricsTest, StatsMetricsSingle) { TEST_F(MetricsTest, MemMetric) { #ifndef ADDRESS_SANITIZER MetricGroup metrics("MemMetrics"); - RegisterMemoryMetrics(&metrics, false, nullptr, nullptr); + ASSERT_OK(RegisterMemoryMetrics(&metrics, false, nullptr, nullptr)); // Smoke test to confirm that tcmalloc metrics are returning reasonable values. UIntGauge* bytes_in_use = metrics.FindMetricForTesting<UIntGauge>("tcmalloc.bytes-in-use"); @@ -249,7 +249,7 @@ TEST_F(MetricsTest, MemMetric) { TEST_F(MetricsTest, JvmMetrics) { MetricGroup metrics("JvmMetrics"); - RegisterMemoryMetrics(&metrics, true, nullptr, nullptr); + ASSERT_OK(RegisterMemoryMetrics(&metrics, true, nullptr, nullptr)); UIntGauge* jvm_total_used = metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<UIntGauge>( "jvm.total.current-usage-bytes"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/network-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h index 2615184..1783589 100644 --- a/be/src/util/network-util.h +++ b/be/src/util/network-util.h @@ -32,7 +32,7 @@ typedef std::string IpAddr; /// 'address'. If the IP addresses of a host don't change, then subsequent calls will /// always return the same address. Returns an error status if any system call failed, /// otherwise OK. Even if OK is returned, addresses may still be of zero length. -Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip); +Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) WARN_UNUSED_RESULT; /// Finds the first non-localhost IP address in the given list. Returns /// true if such an address was found, false otherwise. @@ -40,7 +40,7 @@ bool FindFirstNonLocalhost(const std::vector<std::string>& addresses, std::strin /// Sets the output argument to the system defined hostname. /// Returns OK if a hostname can be found, false otherwise. -Status GetHostname(std::string* hostname); +Status GetHostname(std::string* hostname) WARN_UNUSED_RESULT; /// Utility method because Thrift does not supply useful constructors TNetworkAddress MakeNetworkAddress(const std::string& hostname, int port); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/parquet-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc index e21cff6..d5b0d01 100644 --- a/be/src/util/parquet-reader.cc +++ b/be/src/util/parquet-reader.cc @@ -136,6 +136,7 @@ class ParquetLevelReader : public impala::RleDecoder { // were actually written if the final run is a literal run, only if the final run is // a repeated run (see util/rle-encoding.h for more details). // Returns the number of rows specified by the header. +// Aborts the process if reading the file fails. int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_t* page) { const uint8_t* data = page; std::vector<uint8_t> decompressed_buffer; @@ -143,8 +144,8 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_ decompressed_buffer.resize(header.uncompressed_page_size); boost::scoped_ptr<impala::Codec> decompressor; - impala::Codec::CreateDecompressor( - NULL, false, impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor); + ABORT_IF_ERROR(impala::Codec::CreateDecompressor(NULL, false, + impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor)); uint8_t* buffer_ptr = decompressed_buffer.data(); int uncompressed_page_size = header.uncompressed_page_size; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/runtime-profile.cc ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc index ec31fc9..12f4e25 100644 --- a/be/src/util/runtime-profile.cc +++ b/be/src/util/runtime-profile.cc @@ -33,6 +33,7 @@ #include "util/periodic-counter-updater.h" #include "util/pretty-printer.h" #include "util/redactor.h" +#include "util/scope-exit-trigger.h" #include "common/names.h" @@ -723,37 +724,39 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const { } } -string RuntimeProfile::SerializeToArchiveString() const { +Status RuntimeProfile::SerializeToArchiveString(string* out) const { stringstream ss; - SerializeToArchiveString(&ss); - return ss.str(); + RETURN_IF_ERROR(SerializeToArchiveString(&ss)); + *out = ss.str(); + return Status::OK(); } -void RuntimeProfile::SerializeToArchiveString(stringstream* out) const { +Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const { + Status status; TRuntimeProfileTree thrift_object; const_cast<RuntimeProfile*>(this)->ToThrift(&thrift_object); ThriftSerializer serializer(true); vector<uint8_t> serialized_buffer; - Status status = serializer.Serialize(&thrift_object, &serialized_buffer); - if (!status.ok()) return; + RETURN_IF_ERROR(serializer.Serialize(&thrift_object, &serialized_buffer)); // Compress the serialized thrift string. This uses string keys and is very // easy to compress. scoped_ptr<Codec> compressor; - status = Codec::CreateCompressor(NULL, false, THdfsCompression::DEFAULT, &compressor); - DCHECK(status.ok()) << status.GetDetail(); - if (!status.ok()) return; + RETURN_IF_ERROR( + Codec::CreateCompressor(NULL, false, THdfsCompression::DEFAULT, &compressor)); + const auto close_compressor = + MakeScopeExitTrigger([&compressor]() { compressor->Close(); }); vector<uint8_t> compressed_buffer; compressed_buffer.resize(compressor->MaxOutputLen(serialized_buffer.size())); int64_t result_len = compressed_buffer.size(); uint8_t* compressed_buffer_ptr = compressed_buffer.data(); - compressor->ProcessBlock(true, serialized_buffer.size(), serialized_buffer.data(), - &result_len, &compressed_buffer_ptr); + RETURN_IF_ERROR(compressor->ProcessBlock(true, serialized_buffer.size(), + serialized_buffer.data(), &result_len, &compressed_buffer_ptr)); compressed_buffer.resize(result_len); Base64Encode(compressed_buffer, out); - compressor->Close(); + return Status::OK();; } void RuntimeProfile::ToThrift(TRuntimeProfileTree* tree) const { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/runtime-profile.h ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h index 244ab17..298c214 100644 --- a/be/src/util/runtime-profile.h +++ b/be/src/util/runtime-profile.h @@ -265,8 +265,8 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s /// object using thrift compact binary format, then gzip compresses it and /// finally encodes it as base64. This is not a lightweight operation and /// should not be in the hot path. - std::string SerializeToArchiveString() const; - void SerializeToArchiveString(std::stringstream* out) const; + Status SerializeToArchiveString(std::string* out) const WARN_UNUSED_RESULT; + Status SerializeToArchiveString(std::stringstream* out) const WARN_UNUSED_RESULT; /// Divides all counters by n void Divide(int n); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/thread-pool.h ---------------------------------------------------------------------- diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h index cbc0031..800f690 100644 --- a/be/src/util/thread-pool.h +++ b/be/src/util/thread-pool.h @@ -52,7 +52,7 @@ class ThreadPool : public CacheLineAligned { for (int i = 0; i < num_threads; ++i) { std::stringstream threadname; threadname << thread_prefix << "(" << i + 1 << ":" << num_threads << ")"; - threads_.AddThread(new Thread(group, threadname.str(), + threads_.AddThread(std::make_unique<Thread>(group, threadname.str(), boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i))); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/thread.cc ---------------------------------------------------------------------- diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc index c84ef0b..0e08ab1 100644 --- a/be/src/util/thread.cc +++ b/be/src/util/thread.cc @@ -34,7 +34,6 @@ #include "common/names.h" namespace this_thread = boost::this_thread; -using boost::ptr_vector; using namespace rapidjson; namespace impala { @@ -331,13 +330,12 @@ void Thread::SuperviseThread(const string& name, const string& category, thread_mgr_ref->RemoveThread(this_thread::get_id(), category_copy); } -Status ThreadGroup::AddThread(Thread* thread) { - threads_.push_back(thread); - return Status::OK(); +void ThreadGroup::AddThread(unique_ptr<Thread> thread) { + threads_.emplace_back(move(thread)); } void ThreadGroup::JoinAll() { - for (const Thread& thread: threads_) thread.Join(); + for (auto& thread : threads_) thread->Join(); } int ThreadGroup::Size() const { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/thread.h ---------------------------------------------------------------------- diff --git a/be/src/util/thread.h b/be/src/util/thread.h index e21be7c..18f3a75 100644 --- a/be/src/util/thread.h +++ b/be/src/util/thread.h @@ -18,11 +18,13 @@ #ifndef IMPALA_UTIL_THREAD_H #define IMPALA_UTIL_THREAD_H +#include <memory> +#include <vector> + #include <boost/bind.hpp> #include <boost/function.hpp> #include <boost/scoped_ptr.hpp> #include <boost/thread.hpp> -#include <boost/ptr_container/ptr_vector.hpp> #include "common/status.h" #include "util/promise.h" @@ -173,7 +175,7 @@ class ThreadGroup { /// will destroy it when the ThreadGroup is destroyed. Threads will linger until that /// point (even if terminated), however, so callers should be mindful of the cost of /// placing very many threads in this set. - Status AddThread(Thread* thread); + void AddThread(std::unique_ptr<Thread> thread); /// Waits for all threads to finish. DO NOT call this from a thread inside this set; /// deadlock will predictably ensue. @@ -184,7 +186,7 @@ class ThreadGroup { private: /// All the threads grouped by this set. - boost::ptr_vector<Thread> threads_; + std::vector<std::unique_ptr<Thread>> threads_; }; /// Initialises the threading subsystem. Must be called before a Thread is created.
