This is an automated email from the ASF dual-hosted git repository.

bikram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new cbddda4  IMPALA-8162: Add memory reserved and admitted to the backends 
debug page
cbddda4 is described below

commit cbddda46a1b9ff153e3ff9213b5bae7e8da9f19a
Author: Bikramjeet Vig <bikramjeet....@cloudera.com>
AuthorDate: Mon Feb 4 12:44:44 2019 -0800

    IMPALA-8162: Add memory reserved and admitted to the backends debug page
    
    This patch adds the memory reserved and memory admitted per backend to
    the backends debug page. It also fixes a data race in impala server
    which can potentially crash impalad if the impala http handler tries to
    access a backend descriptor that got concurrently removed by a thread
    processing a membership update from the statestore.
    
    Testing:
    - Added a sanity check for the backends page.
    - Did manual stress testing by running the e2e tests and constantly
    fetching the backends debug page.
    
    Change-Id: Ibaad7676c5180d8d6dbf6ac84d930c79d66d1fcd
    Reviewed-on: http://gerrit.cloudera.org:8080/12364
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/scheduling/admission-controller.cc |   8 ++
 be/src/scheduling/admission-controller.h  |   6 ++
 be/src/service/impala-http-handler.cc     |  16 +++-
 be/src/service/impala-server.cc           | 129 ++++++++++++++++--------------
 be/src/service/impala-server.h            |  15 +++-
 be/src/statestore/statestore.h            |  12 ++-
 tests/webserver/test_web_pages.py         |  12 +++
 www/admission_controller.tmpl             |   8 +-
 www/backends.tmpl                         |   4 +
 9 files changed, 139 insertions(+), 71 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 848060e..77e6d4b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -1261,4 +1261,12 @@ void AdmissionController::PoolStats::InitMetrics() {
   metrics_.clamp_mem_limit_query_option = 
parent_->metrics_group_->AddProperty<bool>(
       POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT, false, name_);
 }
+
+void AdmissionController::PopulatePerHostMemReservedAndAdmitted(
+    std::unordered_map<string, pair<int64_t, int64_t>>* mem_map) {
+  lock_guard<mutex> l(admission_ctrl_lock_);
+  for (const auto& elem: host_mem_reserved_) {
+    (*mem_map)[elem.first] = make_pair(elem.second, 
host_mem_admitted_[elem.first]);
+  }
+}
 }
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index 0fa1596..c13d65f 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -271,6 +271,12 @@ class AdmissionController {
   /// Calls ResetInformationalStats on all pools.
   void ResetAllPoolInformationalStats();
 
+  // Populates the input map with the per host memory reserved and admitted in 
the
+  // following format: <host_address_str, pair<mem_reserved, mem_admitted>>.
+  // Only used for populating the 'backends' debug page.
+  void PopulatePerHostMemReservedAndAdmitted(
+      std::unordered_map<std::string, std::pair<int64_t, int64_t>>* mem_map);
+
  private:
   class PoolStats;
   friend class PoolStats;
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index 394c41e..424e5cf 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -32,10 +32,10 @@
 #include "runtime/query-state.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
+#include "scheduling/admission-controller.h"
 #include "service/impala-server.h"
 #include "service/client-request-state.h"
 #include "service/frontend.h"
-#include "scheduling/admission-controller.h"
 #include "thrift/protocol/TDebugProtocol.h"
 #include "util/coding-util.h"
 #include "util/logging-support.h"
@@ -845,11 +845,15 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool 
include_json_plan, bool include
 
 void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args,
     Document* document) {
+  std::unordered_map<string, pair<int64_t, int64_t>> host_mem_map;
+  
ExecEnv::GetInstance()->admission_controller()->PopulatePerHostMemReservedAndAdmitted(
+      &host_mem_map);
   Value backends_list(kArrayType);
   for (const auto& entry : server_->GetKnownBackends()) {
     TBackendDescriptor backend = entry.second;
     Value backend_obj(kObjectType);
-    Value str(TNetworkAddressToString(backend.address).c_str(), 
document->GetAllocator());
+    string address = TNetworkAddressToString(backend.address);
+    Value str(address.c_str(), document->GetAllocator());
     backend_obj.AddMember("address", str, document->GetAllocator());
     backend_obj.AddMember("is_coordinator", backend.is_coordinator,
         document->GetAllocator());
@@ -858,6 +862,14 @@ void ImpalaHttpHandler::BackendsHandler(const 
Webserver::ArgumentMap& args,
     Value 
admit_mem_limit(PrettyPrinter::PrintBytes(backend.admit_mem_limit).c_str(),
         document->GetAllocator());
     backend_obj.AddMember("admit_mem_limit", admit_mem_limit, 
document->GetAllocator());
+    // If the host address does not exist in the 'host_mem_map', this would 
ensure that a
+    // value of zero is used for those addresses.
+    Value 
mem_reserved(PrettyPrinter::PrintBytes(host_mem_map[address].first).c_str(),
+        document->GetAllocator());
+    backend_obj.AddMember("mem_reserved", mem_reserved, 
document->GetAllocator());
+    Value 
mem_admitted(PrettyPrinter::PrintBytes(host_mem_map[address].second).c_str(),
+        document->GetAllocator());
+    backend_obj.AddMember("mem_admitted", mem_admitted, 
document->GetAllocator());
     backends_list.PushBack(backend_obj, document->GetAllocator());
   }
   document->AddMember("backends", backends_list, document->GetAllocator());
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 5b18cdd..cf16563 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -478,7 +478,8 @@ int ImpalaServer::GetHS2Port() {
   return hs2_server_->port();
 }
 
-const ImpalaServer::BackendDescriptorMap& ImpalaServer::GetKnownBackends() {
+const ImpalaServer::BackendDescriptorMap ImpalaServer::GetKnownBackends() {
+  lock_guard<mutex> l(known_backends_lock_);
   return known_backends_;
 }
 
@@ -1710,15 +1711,20 @@ void ImpalaServer::MembershipCallback(
   // statestore heartbeat less frequently.
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
       incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
+  if (topic == incoming_topic_deltas.end()) return;
 
-  if (topic != incoming_topic_deltas.end()) {
-    const TTopicDelta& delta = topic->second;
+  const TTopicDelta& delta = topic->second;
+  // Create a set of known backend network addresses. Used to test for cluster
+  // membership by network address.
+  set<TNetworkAddress> current_membership;
+  {
+    lock_guard<mutex> l(known_backends_lock_);
     // If this is not a delta, the update should include all entries in the 
topic so
     // clear the saved mapping of known backends.
     if (!delta.is_delta) known_backends_.clear();
 
     // Process membership additions/deletions.
-    for (const TTopicItem& item: delta.topic_entries) {
+    for (const TTopicItem& item : delta.topic_entries) {
       if (item.deleted) {
         auto entry = known_backends_.find(item.key);
         // Remove stale connections to removed members.
@@ -1730,8 +1736,9 @@ void ImpalaServer::MembershipCallback(
       }
       uint32_t len = item.value.size();
       TBackendDescriptor backend_descriptor;
-      Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
-          item.value.data()), &len, false, &backend_descriptor);
+      Status status =
+          DeserializeThriftMsg(reinterpret_cast<const 
uint8_t*>(item.value.data()),
+              &len, false, &backend_descriptor);
       if (!status.ok()) {
         VLOG(2) << "Error deserializing topic item with key: " << item.key;
         continue;
@@ -1745,18 +1752,15 @@ void ImpalaServer::MembershipCallback(
       }
     }
 
-    // Register the local backend in the statestore and update the list of 
known backends.
-    // Only register if all ports have been opened and are ready.
+    // Register the local backend in the statestore and update the list of 
known
+    // backends. Only register if all ports have been opened and are ready.
     if (services_started_.load()) 
AddLocalBackendToStatestore(subscriber_topic_updates);
 
-    // Create a set of known backend network addresses. Used to test for 
cluster
-    // membership by network address.
-    set<TNetworkAddress> current_membership;
     // Also reflect changes to the frontend. Initialized only if any_changes 
is true.
     // Only send the hostname and ip_address of the executors to the frontend.
     TUpdateExecutorMembershipRequest update_req;
     bool any_changes = !delta.topic_entries.empty() || !delta.is_delta;
-    for (const BackendDescriptorMap::value_type& backend: known_backends_) {
+    for (const BackendDescriptorMap::value_type& backend : known_backends_) {
       current_membership.insert(backend.second.address);
       if (any_changes && backend.second.is_executor) {
         update_req.hostnames.insert(backend.second.address.hostname);
@@ -1771,60 +1775,65 @@ void ImpalaServer::MembershipCallback(
                      << status.GetDetail();
       }
     }
+  }
 
-    // Maps from query id (to be cancelled) to a list of failed Impalads that 
are
-    // the cause of the cancellation.
-    map<TUniqueId, vector<TNetworkAddress>> queries_to_cancel;
-    {
-      // Build a list of queries that are running on failed hosts (as 
evidenced by their
-      // absence from the membership list).
-      // TODO: crash-restart failures can give false negatives for failed 
Impala demons.
-      lock_guard<mutex> l(query_locations_lock_);
-      QueryLocations::const_iterator loc_entry = query_locations_.begin();
-      while (loc_entry != query_locations_.end()) {
-        if (current_membership.find(loc_entry->first) == 
current_membership.end()) {
-          unordered_set<TUniqueId>::const_iterator query_id = 
loc_entry->second.begin();
-          // Add failed backend locations to all queries that ran on that 
backend.
-          for(; query_id != loc_entry->second.end(); ++query_id) {
-            vector<TNetworkAddress>& failed_hosts = 
queries_to_cancel[*query_id];
-            failed_hosts.push_back(loc_entry->first);
-          }
-          // We can remove the location wholesale once we know backend's 
failed. To do so
-          // safely during iteration, we have to be careful not in invalidate 
the current
-          // iterator, so copy the iterator to do the erase(..) and advance 
the original.
-          QueryLocations::const_iterator failed_backend = loc_entry;
-          ++loc_entry;
-          query_locations_.erase(failed_backend);
-        } else {
-          ++loc_entry;
+  CancelQueriesOnFailedBackends(current_membership);
+}
+
+void ImpalaServer::CancelQueriesOnFailedBackends(
+    const set<TNetworkAddress>& current_membership) {
+  // Maps from query id (to be cancelled) to a list of failed Impalads that are
+  // the cause of the cancellation.
+  map<TUniqueId, vector<TNetworkAddress>> queries_to_cancel;
+  {
+    // Build a list of queries that are running on failed hosts (as evidenced 
by their
+    // absence from the membership list).
+    // TODO: crash-restart failures can give false negatives for failed Impala 
demons.
+    lock_guard<mutex> l(query_locations_lock_);
+    QueryLocations::const_iterator loc_entry = query_locations_.begin();
+    while (loc_entry != query_locations_.end()) {
+      if (current_membership.find(loc_entry->first) == 
current_membership.end()) {
+        unordered_set<TUniqueId>::const_iterator query_id = 
loc_entry->second.begin();
+        // Add failed backend locations to all queries that ran on that 
backend.
+        for(; query_id != loc_entry->second.end(); ++query_id) {
+          vector<TNetworkAddress>& failed_hosts = queries_to_cancel[*query_id];
+          failed_hosts.push_back(loc_entry->first);
         }
+        // We can remove the location wholesale once we know backend's failed. 
To do so
+        // safely during iteration, we have to be careful not in invalidate 
the current
+        // iterator, so copy the iterator to do the erase(..) and advance the 
original.
+        QueryLocations::const_iterator failed_backend = loc_entry;
+        ++loc_entry;
+        query_locations_.erase(failed_backend);
+      } else {
+        ++loc_entry;
       }
     }
+  }
 
-    if (cancellation_thread_pool_->GetQueueSize() + queries_to_cancel.size() >
-        MAX_CANCELLATION_QUEUE_SIZE) {
-      // Ignore the cancellations - we'll be able to process them on the next 
heartbeat
-      // instead.
-      LOG_EVERY_N(WARNING, 60) << "Cancellation queue is full";
-    } else {
-      // Since we are the only producer for this pool, we know that this 
cannot block
-      // indefinitely since the queue is large enough to accept all new 
cancellation
-      // requests.
-      map<TUniqueId, vector<TNetworkAddress>>::iterator cancellation_entry;
-      for (cancellation_entry = queries_to_cancel.begin();
-          cancellation_entry != queries_to_cancel.end();
-          ++cancellation_entry) {
-        stringstream backends_ss;
-        for (int i = 0; i < cancellation_entry->second.size(); ++i) {
-          backends_ss << 
TNetworkAddressToString(cancellation_entry->second[i]);
-          if (i + 1 != cancellation_entry->second.size()) backends_ss << ", ";
-        }
-        VLOG_QUERY << "Backends failed for query " << 
PrintId(cancellation_entry->first)
-                   << ", adding to queue to check for cancellation: "
-                   << backends_ss.str();
-        cancellation_thread_pool_->Offer(CancellationWork::BackendFailure(
-            cancellation_entry->first, cancellation_entry->second));
+  if (cancellation_thread_pool_->GetQueueSize() + queries_to_cancel.size() >
+      MAX_CANCELLATION_QUEUE_SIZE) {
+    // Ignore the cancellations - we'll be able to process them on the next 
heartbeat
+    // instead.
+    LOG_EVERY_N(WARNING, 60) << "Cancellation queue is full";
+  } else {
+    // Since we are the only producer for this pool, we know that this cannot 
block
+    // indefinitely since the queue is large enough to accept all new 
cancellation
+    // requests.
+    map<TUniqueId, vector<TNetworkAddress>>::iterator cancellation_entry;
+    for (cancellation_entry = queries_to_cancel.begin();
+        cancellation_entry != queries_to_cancel.end();
+        ++cancellation_entry) {
+      stringstream backends_ss;
+      for (int i = 0; i < cancellation_entry->second.size(); ++i) {
+        backends_ss << TNetworkAddressToString(cancellation_entry->second[i]);
+        if (i + 1 != cancellation_entry->second.size()) backends_ss << ", ";
       }
+      VLOG_QUERY << "Backends failed for query " << 
PrintId(cancellation_entry->first)
+                 << ", adding to queue to check for cancellation: "
+                 << backends_ss.str();
+      cancellation_thread_pool_->Offer(CancellationWork::BackendFailure(
+          cancellation_entry->first, cancellation_entry->second));
     }
   }
 }
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index dd7d221..69c1f00 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -156,6 +156,7 @@ class QuerySchedule;
 /// * uuid_lock_
 /// * catalog_version_lock_
 /// * connection_to_sessions_map_lock_
+/// * known_backends_lock_
 ///
 /// TODO: The same doesn't apply to the execution state of an individual plan
 /// fragment: the originating coordinator might die, but we can get notified of
@@ -401,7 +402,7 @@ class ImpalaServer : public ImpalaServiceIf,
   int GetHS2Port();
 
   typedef boost::unordered_map<std::string, TBackendDescriptor> 
BackendDescriptorMap;
-  const BackendDescriptorMap& GetKnownBackends();
+  const BackendDescriptorMap GetKnownBackends();
 
   /// Start the shutdown process. Return an error if it could not be started. 
Otherwise,
   /// if it was successfully started by this or a previous call, return OK 
along with
@@ -709,10 +710,15 @@ class ImpalaServer : public ImpalaServiceIf,
   Status AuthorizeProxyUser(const std::string& user, const std::string& 
do_as_user)
       WARN_UNUSED_RESULT;
 
-  // Check if the local backend descriptor is in the list of known backends. 
If not, add
-  // it to the list of known backends and add it to the 'topic_updates'.
+  /// Check if the local backend descriptor is in the list of known backends. 
If not, add
+  /// it to the list of known backends and add it to the 'topic_updates'.
+  /// 'known_backends_lock_' must be held by the caller.
   void AddLocalBackendToStatestore(std::vector<TTopicDelta>* topic_updates);
 
+  /// Takes a set of network addresses of active backends and cancels all the 
queries
+  /// running on failed ones (that is, addresses not in the active set).
+  void CancelQueriesOnFailedBackends(const std::set<TNetworkAddress>& 
current_membership);
+
   /// Snapshot of a query's state, archived in the query log.
   struct QueryStateRecord {
     /// Pretty-printed runtime profile. TODO: Copy actual profile object
@@ -1078,6 +1084,9 @@ class ImpalaServer : public ImpalaServiceIf,
   /// components.
   BackendDescriptorMap known_backends_;
 
+  /// Lock to protect 'known_backends_'. Not held in conjunction with other 
locks.
+  boost::mutex known_backends_lock_;
+
   /// Generate unique session id for HiveServer2 session
   boost::uuids::random_generator uuid_generator_;
 
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 871494c..0b33e14 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -142,11 +142,15 @@ class Statestore : public CacheLineAligned {
   /// Registers a new subscriber with the given unique subscriber ID, running 
a subscriber
   /// service at the given location, with the provided list of topic 
subscriptions.
   /// The registration_id output parameter is the unique ID for this 
registration, used to
-  /// distinguish old registrations from new ones for the same subscriber.
-  //
-  /// If a registration already exists for this subscriber, the old 
registration is removed
+  /// distinguish old registrations from new ones for the same subscriber. On 
successful
+  /// registration, the subscriber is added to the update queue, with an 
immediate
+  /// schedule.
+  ///
+  /// If a registration already exists for this subscriber, the old 
registration is
+  /// removed
   /// and a new one is created. Subscribers may receive an update intended for 
the old
-  /// registration, since one may be in flight when a new RegisterSubscriber() 
is received.
+  /// registration, since one may be in flight when a new RegisterSubscriber() 
is
+  /// received.
   Status RegisterSubscriber(const SubscriberId& subscriber_id,
       const TNetworkAddress& location,
       const std::vector<TTopicRegistration>& topic_registrations,
diff --git a/tests/webserver/test_web_pages.py 
b/tests/webserver/test_web_pages.py
index 18f83b1..bbc6172 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -44,6 +44,7 @@ class TestWebPage(ImpalaTestSuite):
   JMX_URL = "http://localhost:{0}/jmx";
   ADMISSION_URL = "http://localhost:{0}/admission";
   RESET_RESOURCE_POOL_STATS_URL = "http://localhost:{0}/resource_pool_reset";
+  BACKENDS_URL = "http://localhost:{0}/backends";
 
   # log4j changes do not apply to the statestore since it doesn't
   # have an embedded JVM. So we make two sets of ports to test the
@@ -457,3 +458,14 @@ class TestWebPage(ImpalaTestSuite):
     assert 'resource_pools' in response_json
     assert len(response_json['resource_pools']) == 1
     return response_json['resource_pools']
+
+  @SkipIfBuildType.remote
+  def test_backends_page(self):
+    """Sanity check for the backends debug page's http end point"""
+    responses = self.get_and_check_status(self.BACKENDS_URL + '?json',
+                                          ports_to_test=[25000])
+    assert len(responses) == 1
+    response_json = json.loads(responses[0].text)
+    assert 'backends' in response_json
+    # When this test runs, all impalads would have already started.
+    assert len(response_json['backends']) == 3
diff --git a/www/admission_controller.tmpl b/www/admission_controller.tmpl
index 79d1f7d..edd2fff 100644
--- a/www/admission_controller.tmpl
+++ b/www/admission_controller.tmpl
@@ -198,8 +198,12 @@ function reset_method(pool_name) {
   <a href='/admission'> < Show all Resource Pools</a>
 </p>
 {{/get_all_pools}}
-<p class="lead">This page lists all resource pools to which queries have been 
submitted
-  at least once and their corresponding state and statistics.</p>
+<p class="lead">
+  This page lists all resource pools to which queries have been submitted
+  at least once and their corresponding state and statistics.<br>See the
+  <a href='/backends'>backends</a> debug page for memory admitted and reserved 
per
+  backend.
+</p>
 {{#resource_pools}}
 <div class="container-fluid">
   <h3><a href='/admission?pool_name={{pool_name}}'>{{pool_name}}</a></h3>
diff --git a/www/backends.tmpl b/www/backends.tmpl
index 717d4b9..c80d25a 100644
--- a/www/backends.tmpl
+++ b/www/backends.tmpl
@@ -28,6 +28,8 @@ under the License.
       <th>Executor</th>
       <th>Quiescing</th>
       <th>Memory Limit for Admission</th>
+      <th>Memory Reserved</th>
+      <th>Memory Admitted by Queries Submitted to this Coordinator</th>
     </tr>
   </thead>
   <tbody>
@@ -38,6 +40,8 @@ under the License.
       <td>{{is_executor}}</td>
       <td>{{is_quiescing}}</td>
       <td>{{admit_mem_limit}}</td>
+      <td>{{mem_reserved}}</td>
+      <td>{{mem_admitted}}</td>
     </tr>
     {{/backends}}
   </tbody>

Reply via email to