This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a3631  Add metrics for tracking bp caused by local instances and 
remote stmgrs (#3362)
08a3631 is described below

commit 08a3631913e36df92aa45f4e637ac64d4162382a
Author: Ning Wang <[email protected]>
AuthorDate: Wed Oct 9 11:46:27 2019 -0700

    Add metrics for tracking bp caused by local instances and remote stmgrs 
(#3362)
---
 heron/stmgr/src/cpp/manager/instance-server.cpp | 19 +++++++++++++++++--
 heron/stmgr/src/cpp/manager/instance-server.h   |  5 +++--
 heron/stmgr/src/cpp/manager/stmgr-server.cpp    | 22 ++++++++++++++++++----
 heron/stmgr/src/cpp/manager/stmgr-server.h      |  4 +++-
 4 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp 
b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 9a752de..047c634 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -67,13 +67,16 @@ const sp_string METRIC_FAIL_TUPLES_TO_INSTANCES_LOST = 
"__fail_tuples_to_workers
 // Num bytes lost since instances is not connected
 const sp_string METRIC_BYTES_TO_INSTANCES_LOST = "__bytes_to_workers_lost";
 
+// Time spent in back pressure caused by instances managed by this stmgr.
+const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE =
+    "__time_spent_back_pressure_by_local_instance";
 // Time spent in back pressure aggregated - back pressure initiated by us +
 // others
 const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_AGGR = 
"__server/__time_spent_back_pressure_aggr";
 // Time spent in back pressure because of a component id. The comp id will be
-// appended
-// to the string below
+// appended to the string below
 const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_COMPID = 
"__time_spent_back_pressure_by_compid/";
+
 // Prefix for connection buffer's metrics
 const sp_string CONNECTION_BUFFER_BY_INSTANCEID = 
"__connection_buffer_by_instanceid/";
 // Prefix for connection buffer's length metrics. This is different
@@ -117,6 +120,9 @@ InstanceServer::InstanceServer(
   metrics_manager_client_->register_metric("__server", 
instance_server_metrics_);
   
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR,
                                            back_pressure_metric_aggr_);
+  back_pressure_metric_caused_by_local_instances_ = 
make_shared<heron::common::TimeSpentMetric>();
+  
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE,
+                                           
back_pressure_metric_caused_by_local_instances_);
   spouts_under_back_pressure_ = false;
 
   // Update queue related metrics every 10 seconds
@@ -180,6 +186,8 @@ InstanceServer::~InstanceServer() {
 
   metrics_manager_client_->unregister_metric("__server");
   
metrics_manager_client_->unregister_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR);
+  metrics_manager_client_->unregister_metric(
+      METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE);
 
   // cleanup the instance info
   for (auto iter = instance_info_.begin(); iter != instance_info_.end(); 
++iter) {
@@ -549,6 +557,8 @@ sp_string InstanceServer::GetInstanceName(Connection* 
_connection) {
   return "";
 }
 
+// This function is called when the buffer in the connection is full (the 
instance is not consuming
+// tuples fast enough).
 void InstanceServer::StartBackPressureConnectionCb(Connection* _connection) {
   // The connection will notify us when we can stop the back pressure
   _connection->setCausedBackPressure();
@@ -568,6 +578,8 @@ void 
InstanceServer::StartBackPressureConnectionCb(Connection* _connection) {
 
   if (!stmgr_->DidAnnounceBackPressure()) {
     stmgr_->SendStartBackPressureToOtherStMgrs();
+    // Start backpressure from local instances metric
+    back_pressure_metric_caused_by_local_instances_->Start();
   }
 
   // Indicate which instance component had back pressure
@@ -578,6 +590,7 @@ void 
InstanceServer::StartBackPressureConnectionCb(Connection* _connection) {
   StartBackPressureOnSpouts();
 }
 
+// This function is called when the buffer in the connection is empty (the 
tuples are drained).
 void InstanceServer::StopBackPressureConnectionCb(Connection* _connection) {
   _connection->unsetCausedBackPressure();
 
@@ -605,6 +618,8 @@ void 
InstanceServer::StopBackPressureConnectionCb(Connection* _connection) {
 
   if (!stmgr_->DidAnnounceBackPressure()) {
     stmgr_->SendStopBackPressureToOtherStMgrs();
+    // Stop backpressure from local instances metric
+    back_pressure_metric_caused_by_local_instances_->Stop();
   }
   AttemptStopBackPressureFromSpouts();
 }
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h 
b/heron/stmgr/src/cpp/manager/instance-server.h
index b4a6038..e3a1b82 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -31,11 +31,11 @@
 
 namespace heron {
 namespace common {
+class AssignableMetric;
 class MetricsMgrSt;
 class MultiCountMetric;
-class TimeSpentMetric;
-class AssignableMetric;
 class MultiMeanMetric;
+class TimeSpentMetric;
 }
 }
 
@@ -196,6 +196,7 @@ class InstanceServer : public Server {
   shared_ptr<heron::common::MetricsMgrSt> metrics_manager_client_;
   shared_ptr<heron::common::MultiCountMetric> instance_server_metrics_;
   shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_aggr_;
+  shared_ptr<heron::common::TimeSpentMetric> 
back_pressure_metric_caused_by_local_instances_;
 
   bool spouts_under_back_pressure_;
 
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp 
b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 12e573e..f16afca 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -47,6 +47,10 @@ const sp_string METRIC_ACK_TUPLES_FROM_STMGRS = 
"__ack_tuples_from_stmgrs";
 const sp_string METRIC_FAIL_TUPLES_FROM_STMGRS = "__fail_tuples_from_stmgrs";
 // Bytes received from other stream managers
 const sp_string METRIC_BYTES_FROM_STMGRS = "__bytes_from_stmgrs";
+// Time spent in back pressure caused by remote stream managers.
+const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_REMOTE_STMGR =
+    "__time_spent_back_pressure_by_remote_stmgr";
+
 
 StMgrServer::StMgrServer(shared_ptr<EventLoop> eventLoop, const 
NetworkOptions& _options,
                          const sp_string& _topology_name, const sp_string& 
_topology_id,
@@ -80,6 +84,10 @@ StMgrServer::StMgrServer(shared_ptr<EventLoop> eventLoop, 
const NetworkOptions&
   bytes_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
   metrics_manager_client_->register_metric(SERVER_SCOPE + 
METRIC_BYTES_FROM_STMGRS,
                                            bytes_from_stmgrs_metrics_);
+  back_pressure_metric_caused_by_remote_stmgr_ = 
make_shared<heron::common::TimeSpentMetric>();
+  metrics_manager_client_->register_metric(
+      SERVER_SCOPE + METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_REMOTE_STMGR,
+      back_pressure_metric_caused_by_remote_stmgr_);
 }
 
 StMgrServer::~StMgrServer() {
@@ -88,6 +96,8 @@ StMgrServer::~StMgrServer() {
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + 
METRIC_ACK_TUPLES_FROM_STMGRS);
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + 
METRIC_FAIL_TUPLES_FROM_STMGRS);
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + 
METRIC_BYTES_FROM_STMGRS);
+  metrics_manager_client_->unregister_metric(
+      SERVER_SCOPE + METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_REMOTE_STMGR);
 }
 
 void StMgrServer::HandleNewConnection(Connection* _conn) {
@@ -180,10 +190,12 @@ void StMgrServer::HandleTupleStreamMessage(Connection* 
_conn,
 void StMgrServer::StartBackPressureClientCb(const sp_string& _other_stmgr_id) {
   if (!stmgr_->DidAnnounceBackPressure()) {
     stmgr_->SendStartBackPressureToOtherStMgrs();
+    // Start backpressure from remote stmgr metric
+    back_pressure_metric_caused_by_remote_stmgr_->Start();
   }
   remote_ends_who_caused_back_pressure_.insert(_other_stmgr_id);
-  LOG(INFO) << "We observe back pressure on sending data to remote stream 
manager "
-            << _other_stmgr_id;
+  LOG(WARNING) << "We observe back pressure on sending data to remote stream 
manager "
+               << _other_stmgr_id;
   stmgr_->StartBackPressureOnSpouts();
 }
 
@@ -194,9 +206,11 @@ void StMgrServer::StopBackPressureClientCb(const 
sp_string& _other_stmgr_id) {
 
   if (!stmgr_->DidAnnounceBackPressure()) {
     stmgr_->SendStopBackPressureToOtherStMgrs();
+    // Stop backpressure from remote stmgr metric
+    back_pressure_metric_caused_by_remote_stmgr_->Stop();
   }
-  LOG(INFO) << "We don't observe back pressure now on sending data to remote "
-               "stream manager "
+  LOG(WARNING) << "We don't observe back pressure now on sending data to 
remote "
+                  "stream manager "
             << _other_stmgr_id;
   if (!stmgr_->DidAnnounceBackPressure() && 
!stmgr_->DidOthersAnnounceBackPressure()) {
     stmgr_->AttemptStopBackPressureFromSpouts();
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h 
b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 325f0c1..d0baf69 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -31,8 +31,9 @@
 
 namespace heron {
 namespace common {
-class MetricsMgrSt;
 class CountMetric;
+class MetricsMgrSt;
+class TimeSpentMetric;
 }
 }
 
@@ -107,6 +108,7 @@ class StMgrServer : public Server {
   shared_ptr<heron::common::CountMetric> ack_tuples_from_stmgrs_metrics_;
   shared_ptr<heron::common::CountMetric> fail_tuples_from_stmgrs_metrics_;
   shared_ptr<heron::common::CountMetric> bytes_from_stmgrs_metrics_;
+  shared_ptr<heron::common::TimeSpentMetric> 
back_pressure_metric_caused_by_remote_stmgr_;
 };
 
 }  // namespace stmgr

Reply via email to