IMPALA-6576: Add metrics for data stream service memory usage This change adds metrics for the data stream service memory usage. Both current and peak usage are exposed.
It adds a test to test_krpc_metrics.py to make sure that the expected metrics are present and that the peak usage shows a non-zero value after running a query. Change-Id: I5033b8dfda0b23d4230535ba13c3e050a35d01a3 Reviewed-on: http://gerrit.cloudera.org:8080/9562 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8283081b Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8283081b Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8283081b Branch: refs/heads/master Commit: 8283081bf089b7ec61ce7c28f4e91e9acd157980 Parents: 8e1bf0e Author: Lars Volker <l...@cloudera.com> Authored: Thu Mar 8 17:55:11 2018 -0800 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Tue Mar 13 02:01:15 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/exec-env.cc | 2 +- be/src/service/data-stream-service.cc | 4 ++- be/src/service/data-stream-service.h | 4 +-- be/src/util/memory-metrics.cc | 29 ++++++++++++++++++++ be/src/util/memory-metrics.h | 25 +++++++++++++++++ common/thrift/metrics.json | 20 ++++++++++++++ tests/custom_cluster/test_krpc_metrics.py | 37 +++++++++++++++++--------- 7 files changed, 104 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/8283081b/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index e64cc81..57e1406 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -302,7 +302,7 @@ Status ExecEnv::Init() { // Initialization needs to happen in the following order due to dependencies: // - RPC manager, DataStreamService and DataStreamManager. RETURN_IF_ERROR(rpc_mgr_->Init()); - data_svc_.reset(new DataStreamService()); + data_svc_.reset(new DataStreamService(rpc_metrics_)); RETURN_IF_ERROR(data_svc_->Init()); RETURN_IF_ERROR(KrpcStreamMgr()->Init(data_svc_->mem_tracker())); // Bump thread cache to 1GB to reduce contention for TCMalloc central http://git-wip-us.apache.org/repos/asf/impala/blob/8283081b/be/src/service/data-stream-service.cc ---------------------------------------------------------------------- diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc index b04105b..d94837b 100644 --- a/be/src/service/data-stream-service.cc +++ b/be/src/service/data-stream-service.cc @@ -27,6 +27,7 @@ #include "runtime/krpc-data-stream-mgr.h" #include "runtime/exec-env.h" #include "runtime/row-batch.h" +#include "util/memory-metrics.h" #include "util/parse-util.h" #include "testutil/fault-injection-util.h" @@ -45,7 +46,7 @@ DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for proce namespace impala { -DataStreamService::DataStreamService() +DataStreamService::DataStreamService(MetricGroup* metric_group) : DataStreamServiceIf(ExecEnv::GetInstance()->rpc_mgr()->metric_entity(), ExecEnv::GetInstance()->rpc_mgr()->result_tracker()) { MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker(); @@ -54,6 +55,7 @@ DataStreamService::DataStreamService() &is_percent, process_mem_tracker->limit()); mem_tracker_.reset(new MemTracker( bytes_limit, "Data Stream Service Queue", process_mem_tracker)); + MemTrackerMetric::CreateMetrics(metric_group, mem_tracker_.get(), "DataStreamService"); } Status DataStreamService::Init() { http://git-wip-us.apache.org/repos/asf/impala/blob/8283081b/be/src/service/data-stream-service.h ---------------------------------------------------------------------- diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h index cba27ae..812fb2c 100644 --- a/be/src/service/data-stream-service.h +++ b/be/src/service/data-stream-service.h @@ -37,10 +37,10 @@ class RpcMgr; /// instances. The client for this service is implemented in KrpcDataStreamSender. /// The processing of incoming requests is implemented in KrpcDataStreamRecvr. /// KrpcDataStreamMgr is responsible for routing the incoming requests to the -/// appropriate receivers. +/// appropriate receivers. Metrics exposed by the service will be added to 'metric_group'. class DataStreamService : public DataStreamServiceIf { public: - DataStreamService(); + DataStreamService(MetricGroup* metric_group); /// Initializes the service by registering it with the singleton RPC manager. /// This mustn't be called until RPC manager has been initialized. http://git-wip-us.apache.org/repos/asf/impala/blob/8283081b/be/src/util/memory-metrics.cc ---------------------------------------------------------------------- diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc index fd78343..d37fb3a 100644 --- a/be/src/util/memory-metrics.cc +++ b/be/src/util/memory-metrics.cc @@ -22,6 +22,7 @@ #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/mem-tracker.h" #include "util/jni-util.h" #include "util/mem-info.h" #include "util/time.h" @@ -300,3 +301,31 @@ int64_t BufferPoolMetric::GetValue() { } return 0; } + +void MemTrackerMetric::CreateMetrics(MetricGroup* metrics, MemTracker* mem_tracker, + const string& name) { + metrics->RegisterMetric( + new MemTrackerMetric(MetricDefs::Get("mem-tracker.$0.current_usage_bytes", name), + MemTrackerMetricType::CURRENT, mem_tracker)); + metrics->RegisterMetric( + new MemTrackerMetric(MetricDefs::Get("mem-tracker.$0.peak_usage_bytes", name), + MemTrackerMetricType::PEAK, mem_tracker)); +} + +MemTrackerMetric::MemTrackerMetric(const TMetricDef& def, MemTrackerMetricType type, + MemTracker* mem_tracker) + : IntGauge(def, 0), + type_(type), + mem_tracker_(mem_tracker) {} + +int64_t MemTrackerMetric::GetValue() { + switch (type_) { + case MemTrackerMetricType::CURRENT: + return mem_tracker_->consumption(); + case MemTrackerMetricType::PEAK: + return mem_tracker_->peak_consumption(); + default: + DCHECK(false) << "Unknown MemTrackerMetricType: " << static_cast<int>(type_); + } + return 0; +} http://git-wip-us.apache.org/repos/asf/impala/blob/8283081b/be/src/util/memory-metrics.h ---------------------------------------------------------------------- diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h index 0ac04bf..ed0e889 100644 --- a/be/src/util/memory-metrics.h +++ b/be/src/util/memory-metrics.h @@ -33,6 +33,7 @@ namespace impala { class BufferPool; +class MemTracker; class ReservationTracker; class Thread; @@ -240,6 +241,30 @@ class BufferPoolMetric : public IntGauge { BufferPool* buffer_pool_; }; +/// Metric that reports information about a MemTracker. +class MemTrackerMetric : public IntGauge { + public: + // Creates two new metrics tracking the current and peak usages of 'mem_tracker' in + // the metrics group 'metrics'. The caller must make sure that 'mem_tracker' is not + // destructed before 'metrics'. + static void CreateMetrics(MetricGroup* metrics, MemTracker* mem_tracker, + const std::string& name); + + virtual int64_t GetValue() override; + + private: + enum class MemTrackerMetricType { + CURRENT, // Current usage of the MemTracker + PEAK, // Peak usage of the MemTracker + }; + + MemTrackerMetric(const TMetricDef& def, MemTrackerMetricType type, + MemTracker* mem_tracker); + + const MemTrackerMetricType type_; + const MemTracker* mem_tracker_; +}; + /// Registers common tcmalloc memory metrics. If 'register_jvm_metrics' is true, the JVM /// memory metrics are also registered. If 'global_reservations' and 'buffer_pool' are /// not NULL, also register buffer pool metrics. http://git-wip-us.apache.org/repos/asf/impala/blob/8283081b/common/thrift/metrics.json ---------------------------------------------------------------------- diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index cce34d7..9498479 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -1655,5 +1655,25 @@ "units": "UNIT", "kind": "COUNTER", "key": "rpc.$0.rpcs_queue_overflow" + }, + { + "description": "Memtracker $0 Current Usage Bytes", + "contexts": [ + "IMPALAD" + ], + "label": "Memtracker $0 Current Usage Bytes", + "units": "BYTES", + "kind": "GAUGE", + "key": "mem-tracker.$0.current_usage_bytes" + }, + { + "description": "Memtracker $0 Peak Usage Bytes", + "contexts": [ + "IMPALAD" + ], + "label": "Memtracker $0 Peak Max Bytes", + "units": "BYTES", + "kind": "GAUGE", + "key": "mem-tracker.$0.peak_usage_bytes" } ] http://git-wip-us.apache.org/repos/asf/impala/blob/8283081b/tests/custom_cluster/test_krpc_metrics.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_krpc_metrics.py b/tests/custom_cluster/test_krpc_metrics.py index ec56f41..de16ccc 100644 --- a/tests/custom_cluster/test_krpc_metrics.py +++ b/tests/custom_cluster/test_krpc_metrics.py @@ -69,12 +69,8 @@ class TestKrpcMetrics(CustomClusterTestSuite): assert before < after - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \ - -datastream_service_num_svc_threads=1') - def test_krpc_queue_overflow_metrics(self, vector): - """Test that rejected RPCs show up on the /metrics debug web page. - """ + def get_metric(self, name): + """Finds the metric with name 'name' and returns its value as an int.""" def iter_metrics(group): for m in group['metrics']: yield m @@ -82,16 +78,31 @@ class TestKrpcMetrics(CustomClusterTestSuite): for m in iter_metrics(c): yield m - def get_metric(name): - metrics = self.get_debug_page(self.METRICS_URL)['metric_group'] - for m in iter_metrics(metrics): - if m['name'] == name: - return int(m['value']) + metrics = self.get_debug_page(self.METRICS_URL)['metric_group'] + for m in iter_metrics(metrics): + if m['name'] == name: + return int(m['value']) + assert False, "Could not find metric: %s" % name + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \ + -datastream_service_num_svc_threads=1') + def test_krpc_queue_overflow_metrics(self, vector): + """Test that rejected RPCs show up on the /metrics debug web page. + """ metric_name = 'rpc.impala.DataStreamService.rpcs_queue_overflow' - before = get_metric(metric_name) + before = self.get_metric(metric_name) assert before == 0 self.client.execute(self.TEST_QUERY) - after = get_metric(metric_name) + after = self.get_metric(metric_name) assert before < after + + @pytest.mark.execute_serially + def test_krpc_service_queue_metrics(self, vector): + """Test that memory usage metrics for the data stream service queue show up on the + /metrics debug web page. + """ + self.client.execute(self.TEST_QUERY) + assert self.get_metric('mem-tracker.DataStreamService.current_usage_bytes') >= 0 + assert self.get_metric('mem-tracker.DataStreamService.peak_usage_bytes') > 0