This is an automated email from the ASF dual-hosted git repository. bikram pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new cbddda4 IMPALA-8162: Add memory reserved and admitted to the backends debug page cbddda4 is described below commit cbddda46a1b9ff153e3ff9213b5bae7e8da9f19a Author: Bikramjeet Vig <bikramjeet....@cloudera.com> AuthorDate: Mon Feb 4 12:44:44 2019 -0800 IMPALA-8162: Add memory reserved and admitted to the backends debug page This patch adds the memory reserved and memory admitted per backend to the backends debug page. It also fixes a data race in impala server which can potentially crash impalad if the impala http handler tries to access a backend descriptor that got concurrently removed by a thread processing a membership update from the statestore. Testing: - Added a sanity check for the backends page. - Did manual stress testing by running the e2e tests and constantly fetching the backends debug page. Change-Id: Ibaad7676c5180d8d6dbf6ac84d930c79d66d1fcd Reviewed-on: http://gerrit.cloudera.org:8080/12364 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/scheduling/admission-controller.cc | 8 ++ be/src/scheduling/admission-controller.h | 6 ++ be/src/service/impala-http-handler.cc | 16 +++- be/src/service/impala-server.cc | 129 ++++++++++++++++-------------- be/src/service/impala-server.h | 15 +++- be/src/statestore/statestore.h | 12 ++- tests/webserver/test_web_pages.py | 12 +++ www/admission_controller.tmpl | 8 +- www/backends.tmpl | 4 + 9 files changed, 139 insertions(+), 71 deletions(-) diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 848060e..77e6d4b 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -1261,4 +1261,12 @@ void AdmissionController::PoolStats::InitMetrics() { metrics_.clamp_mem_limit_query_option = parent_->metrics_group_->AddProperty<bool>( POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT, false, name_); } + +void AdmissionController::PopulatePerHostMemReservedAndAdmitted( + std::unordered_map<string, pair<int64_t, int64_t>>* mem_map) { + lock_guard<mutex> l(admission_ctrl_lock_); + for (const auto& elem: host_mem_reserved_) { + (*mem_map)[elem.first] = make_pair(elem.second, host_mem_admitted_[elem.first]); + } +} } diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index 0fa1596..c13d65f 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -271,6 +271,12 @@ class AdmissionController { /// Calls ResetInformationalStats on all pools. void ResetAllPoolInformationalStats(); + // Populates the input map with the per host memory reserved and admitted in the + // following format: <host_address_str, pair<mem_reserved, mem_admitted>>. + // Only used for populating the 'backends' debug page. + void PopulatePerHostMemReservedAndAdmitted( + std::unordered_map<std::string, std::pair<int64_t, int64_t>>* mem_map); + private: class PoolStats; friend class PoolStats; diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 394c41e..424e5cf 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -32,10 +32,10 @@ #include "runtime/query-state.h" #include "runtime/timestamp-value.h" #include "runtime/timestamp-value.inline.h" +#include "scheduling/admission-controller.h" #include "service/impala-server.h" #include "service/client-request-state.h" #include "service/frontend.h" -#include "scheduling/admission-controller.h" #include "thrift/protocol/TDebugProtocol.h" #include "util/coding-util.h" #include "util/logging-support.h" @@ -845,11 +845,15 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args, Document* document) { + std::unordered_map<string, pair<int64_t, int64_t>> host_mem_map; + ExecEnv::GetInstance()->admission_controller()->PopulatePerHostMemReservedAndAdmitted( + &host_mem_map); Value backends_list(kArrayType); for (const auto& entry : server_->GetKnownBackends()) { TBackendDescriptor backend = entry.second; Value backend_obj(kObjectType); - Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator()); + string address = TNetworkAddressToString(backend.address); + Value str(address.c_str(), document->GetAllocator()); backend_obj.AddMember("address", str, document->GetAllocator()); backend_obj.AddMember("is_coordinator", backend.is_coordinator, document->GetAllocator()); @@ -858,6 +862,14 @@ void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args, Value admit_mem_limit(PrettyPrinter::PrintBytes(backend.admit_mem_limit).c_str(), document->GetAllocator()); backend_obj.AddMember("admit_mem_limit", admit_mem_limit, document->GetAllocator()); + // If the host address does not exist in the 'host_mem_map', this would ensure that a + // value of zero is used for those addresses. + Value mem_reserved(PrettyPrinter::PrintBytes(host_mem_map[address].first).c_str(), + document->GetAllocator()); + backend_obj.AddMember("mem_reserved", mem_reserved, document->GetAllocator()); + Value mem_admitted(PrettyPrinter::PrintBytes(host_mem_map[address].second).c_str(), + document->GetAllocator()); + backend_obj.AddMember("mem_admitted", mem_admitted, document->GetAllocator()); backends_list.PushBack(backend_obj, document->GetAllocator()); } document->AddMember("backends", backends_list, document->GetAllocator()); diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 5b18cdd..cf16563 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -478,7 +478,8 @@ int ImpalaServer::GetHS2Port() { return hs2_server_->port(); } -const ImpalaServer::BackendDescriptorMap& ImpalaServer::GetKnownBackends() { +const ImpalaServer::BackendDescriptorMap ImpalaServer::GetKnownBackends() { + lock_guard<mutex> l(known_backends_lock_); return known_backends_; } @@ -1710,15 +1711,20 @@ void ImpalaServer::MembershipCallback( // statestore heartbeat less frequently. StatestoreSubscriber::TopicDeltaMap::const_iterator topic = incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC); + if (topic == incoming_topic_deltas.end()) return; - if (topic != incoming_topic_deltas.end()) { - const TTopicDelta& delta = topic->second; + const TTopicDelta& delta = topic->second; + // Create a set of known backend network addresses. Used to test for cluster + // membership by network address. + set<TNetworkAddress> current_membership; + { + lock_guard<mutex> l(known_backends_lock_); // If this is not a delta, the update should include all entries in the topic so // clear the saved mapping of known backends. if (!delta.is_delta) known_backends_.clear(); // Process membership additions/deletions. - for (const TTopicItem& item: delta.topic_entries) { + for (const TTopicItem& item : delta.topic_entries) { if (item.deleted) { auto entry = known_backends_.find(item.key); // Remove stale connections to removed members. @@ -1730,8 +1736,9 @@ void ImpalaServer::MembershipCallback( } uint32_t len = item.value.size(); TBackendDescriptor backend_descriptor; - Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>( - item.value.data()), &len, false, &backend_descriptor); + Status status = + DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(item.value.data()), + &len, false, &backend_descriptor); if (!status.ok()) { VLOG(2) << "Error deserializing topic item with key: " << item.key; continue; @@ -1745,18 +1752,15 @@ void ImpalaServer::MembershipCallback( } } - // Register the local backend in the statestore and update the list of known backends. - // Only register if all ports have been opened and are ready. + // Register the local backend in the statestore and update the list of known + // backends. Only register if all ports have been opened and are ready. if (services_started_.load()) AddLocalBackendToStatestore(subscriber_topic_updates); - // Create a set of known backend network addresses. Used to test for cluster - // membership by network address. - set<TNetworkAddress> current_membership; // Also reflect changes to the frontend. Initialized only if any_changes is true. // Only send the hostname and ip_address of the executors to the frontend. TUpdateExecutorMembershipRequest update_req; bool any_changes = !delta.topic_entries.empty() || !delta.is_delta; - for (const BackendDescriptorMap::value_type& backend: known_backends_) { + for (const BackendDescriptorMap::value_type& backend : known_backends_) { current_membership.insert(backend.second.address); if (any_changes && backend.second.is_executor) { update_req.hostnames.insert(backend.second.address.hostname); @@ -1771,60 +1775,65 @@ void ImpalaServer::MembershipCallback( << status.GetDetail(); } } + } - // Maps from query id (to be cancelled) to a list of failed Impalads that are - // the cause of the cancellation. - map<TUniqueId, vector<TNetworkAddress>> queries_to_cancel; - { - // Build a list of queries that are running on failed hosts (as evidenced by their - // absence from the membership list). - // TODO: crash-restart failures can give false negatives for failed Impala demons. - lock_guard<mutex> l(query_locations_lock_); - QueryLocations::const_iterator loc_entry = query_locations_.begin(); - while (loc_entry != query_locations_.end()) { - if (current_membership.find(loc_entry->first) == current_membership.end()) { - unordered_set<TUniqueId>::const_iterator query_id = loc_entry->second.begin(); - // Add failed backend locations to all queries that ran on that backend. - for(; query_id != loc_entry->second.end(); ++query_id) { - vector<TNetworkAddress>& failed_hosts = queries_to_cancel[*query_id]; - failed_hosts.push_back(loc_entry->first); - } - // We can remove the location wholesale once we know backend's failed. To do so - // safely during iteration, we have to be careful not in invalidate the current - // iterator, so copy the iterator to do the erase(..) and advance the original. - QueryLocations::const_iterator failed_backend = loc_entry; - ++loc_entry; - query_locations_.erase(failed_backend); - } else { - ++loc_entry; + CancelQueriesOnFailedBackends(current_membership); +} + +void ImpalaServer::CancelQueriesOnFailedBackends( + const set<TNetworkAddress>& current_membership) { + // Maps from query id (to be cancelled) to a list of failed Impalads that are + // the cause of the cancellation. + map<TUniqueId, vector<TNetworkAddress>> queries_to_cancel; + { + // Build a list of queries that are running on failed hosts (as evidenced by their + // absence from the membership list). + // TODO: crash-restart failures can give false negatives for failed Impala demons. + lock_guard<mutex> l(query_locations_lock_); + QueryLocations::const_iterator loc_entry = query_locations_.begin(); + while (loc_entry != query_locations_.end()) { + if (current_membership.find(loc_entry->first) == current_membership.end()) { + unordered_set<TUniqueId>::const_iterator query_id = loc_entry->second.begin(); + // Add failed backend locations to all queries that ran on that backend. + for(; query_id != loc_entry->second.end(); ++query_id) { + vector<TNetworkAddress>& failed_hosts = queries_to_cancel[*query_id]; + failed_hosts.push_back(loc_entry->first); } + // We can remove the location wholesale once we know backend's failed. To do so + // safely during iteration, we have to be careful not in invalidate the current + // iterator, so copy the iterator to do the erase(..) and advance the original. + QueryLocations::const_iterator failed_backend = loc_entry; + ++loc_entry; + query_locations_.erase(failed_backend); + } else { + ++loc_entry; } } + } - if (cancellation_thread_pool_->GetQueueSize() + queries_to_cancel.size() > - MAX_CANCELLATION_QUEUE_SIZE) { - // Ignore the cancellations - we'll be able to process them on the next heartbeat - // instead. - LOG_EVERY_N(WARNING, 60) << "Cancellation queue is full"; - } else { - // Since we are the only producer for this pool, we know that this cannot block - // indefinitely since the queue is large enough to accept all new cancellation - // requests. - map<TUniqueId, vector<TNetworkAddress>>::iterator cancellation_entry; - for (cancellation_entry = queries_to_cancel.begin(); - cancellation_entry != queries_to_cancel.end(); - ++cancellation_entry) { - stringstream backends_ss; - for (int i = 0; i < cancellation_entry->second.size(); ++i) { - backends_ss << TNetworkAddressToString(cancellation_entry->second[i]); - if (i + 1 != cancellation_entry->second.size()) backends_ss << ", "; - } - VLOG_QUERY << "Backends failed for query " << PrintId(cancellation_entry->first) - << ", adding to queue to check for cancellation: " - << backends_ss.str(); - cancellation_thread_pool_->Offer(CancellationWork::BackendFailure( - cancellation_entry->first, cancellation_entry->second)); + if (cancellation_thread_pool_->GetQueueSize() + queries_to_cancel.size() > + MAX_CANCELLATION_QUEUE_SIZE) { + // Ignore the cancellations - we'll be able to process them on the next heartbeat + // instead. + LOG_EVERY_N(WARNING, 60) << "Cancellation queue is full"; + } else { + // Since we are the only producer for this pool, we know that this cannot block + // indefinitely since the queue is large enough to accept all new cancellation + // requests. + map<TUniqueId, vector<TNetworkAddress>>::iterator cancellation_entry; + for (cancellation_entry = queries_to_cancel.begin(); + cancellation_entry != queries_to_cancel.end(); + ++cancellation_entry) { + stringstream backends_ss; + for (int i = 0; i < cancellation_entry->second.size(); ++i) { + backends_ss << TNetworkAddressToString(cancellation_entry->second[i]); + if (i + 1 != cancellation_entry->second.size()) backends_ss << ", "; } + VLOG_QUERY << "Backends failed for query " << PrintId(cancellation_entry->first) + << ", adding to queue to check for cancellation: " + << backends_ss.str(); + cancellation_thread_pool_->Offer(CancellationWork::BackendFailure( + cancellation_entry->first, cancellation_entry->second)); } } } diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index dd7d221..69c1f00 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -156,6 +156,7 @@ class QuerySchedule; /// * uuid_lock_ /// * catalog_version_lock_ /// * connection_to_sessions_map_lock_ +/// * known_backends_lock_ /// /// TODO: The same doesn't apply to the execution state of an individual plan /// fragment: the originating coordinator might die, but we can get notified of @@ -401,7 +402,7 @@ class ImpalaServer : public ImpalaServiceIf, int GetHS2Port(); typedef boost::unordered_map<std::string, TBackendDescriptor> BackendDescriptorMap; - const BackendDescriptorMap& GetKnownBackends(); + const BackendDescriptorMap GetKnownBackends(); /// Start the shutdown process. Return an error if it could not be started. Otherwise, /// if it was successfully started by this or a previous call, return OK along with @@ -709,10 +710,15 @@ class ImpalaServer : public ImpalaServiceIf, Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user) WARN_UNUSED_RESULT; - // Check if the local backend descriptor is in the list of known backends. If not, add - // it to the list of known backends and add it to the 'topic_updates'. + /// Check if the local backend descriptor is in the list of known backends. If not, add + /// it to the list of known backends and add it to the 'topic_updates'. + /// 'known_backends_lock_' must be held by the caller. void AddLocalBackendToStatestore(std::vector<TTopicDelta>* topic_updates); + /// Takes a set of network addresses of active backends and cancels all the queries + /// running on failed ones (that is, addresses not in the active set). + void CancelQueriesOnFailedBackends(const std::set<TNetworkAddress>& current_membership); + /// Snapshot of a query's state, archived in the query log. struct QueryStateRecord { /// Pretty-printed runtime profile. TODO: Copy actual profile object @@ -1078,6 +1084,9 @@ class ImpalaServer : public ImpalaServiceIf, /// components. BackendDescriptorMap known_backends_; + /// Lock to protect 'known_backends_'. Not held in conjunction with other locks. + boost::mutex known_backends_lock_; + /// Generate unique session id for HiveServer2 session boost::uuids::random_generator uuid_generator_; diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index 871494c..0b33e14 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -142,11 +142,15 @@ class Statestore : public CacheLineAligned { /// Registers a new subscriber with the given unique subscriber ID, running a subscriber /// service at the given location, with the provided list of topic subscriptions. /// The registration_id output parameter is the unique ID for this registration, used to - /// distinguish old registrations from new ones for the same subscriber. - // - /// If a registration already exists for this subscriber, the old registration is removed + /// distinguish old registrations from new ones for the same subscriber. On successful + /// registration, the subscriber is added to the update queue, with an immediate + /// schedule. + /// + /// If a registration already exists for this subscriber, the old registration is + /// removed /// and a new one is created. Subscribers may receive an update intended for the old - /// registration, since one may be in flight when a new RegisterSubscriber() is received. + /// registration, since one may be in flight when a new RegisterSubscriber() is + /// received. Status RegisterSubscriber(const SubscriberId& subscriber_id, const TNetworkAddress& location, const std::vector<TTopicRegistration>& topic_registrations, diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py index 18f83b1..bbc6172 100644 --- a/tests/webserver/test_web_pages.py +++ b/tests/webserver/test_web_pages.py @@ -44,6 +44,7 @@ class TestWebPage(ImpalaTestSuite): JMX_URL = "http://localhost:{0}/jmx" ADMISSION_URL = "http://localhost:{0}/admission" RESET_RESOURCE_POOL_STATS_URL = "http://localhost:{0}/resource_pool_reset" + BACKENDS_URL = "http://localhost:{0}/backends" # log4j changes do not apply to the statestore since it doesn't # have an embedded JVM. So we make two sets of ports to test the @@ -457,3 +458,14 @@ class TestWebPage(ImpalaTestSuite): assert 'resource_pools' in response_json assert len(response_json['resource_pools']) == 1 return response_json['resource_pools'] + + @SkipIfBuildType.remote + def test_backends_page(self): + """Sanity check for the backends debug page's http end point""" + responses = self.get_and_check_status(self.BACKENDS_URL + '?json', + ports_to_test=[25000]) + assert len(responses) == 1 + response_json = json.loads(responses[0].text) + assert 'backends' in response_json + # When this test runs, all impalads would have already started. + assert len(response_json['backends']) == 3 diff --git a/www/admission_controller.tmpl b/www/admission_controller.tmpl index 79d1f7d..edd2fff 100644 --- a/www/admission_controller.tmpl +++ b/www/admission_controller.tmpl @@ -198,8 +198,12 @@ function reset_method(pool_name) { <a href='/admission'> < Show all Resource Pools</a> </p> {{/get_all_pools}} -<p class="lead">This page lists all resource pools to which queries have been submitted - at least once and their corresponding state and statistics.</p> +<p class="lead"> + This page lists all resource pools to which queries have been submitted + at least once and their corresponding state and statistics.<br>See the + <a href='/backends'>backends</a> debug page for memory admitted and reserved per + backend. +</p> {{#resource_pools}} <div class="container-fluid"> <h3><a href='/admission?pool_name={{pool_name}}'>{{pool_name}}</a></h3> diff --git a/www/backends.tmpl b/www/backends.tmpl index 717d4b9..c80d25a 100644 --- a/www/backends.tmpl +++ b/www/backends.tmpl @@ -28,6 +28,8 @@ under the License. <th>Executor</th> <th>Quiescing</th> <th>Memory Limit for Admission</th> + <th>Memory Reserved</th> + <th>Memory Admitted by Queries Submitted to this Coordinator</th> </tr> </thead> <tbody> @@ -38,6 +40,8 @@ under the License. <td>{{is_executor}}</td> <td>{{is_quiescing}}</td> <td>{{admit_mem_limit}}</td> + <td>{{mem_reserved}}</td> + <td>{{mem_admitted}}</td> </tr> {{/backends}} </tbody>