This is an automated email from the ASF dual-hosted git repository. abukor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f4176b1ad00c3fa01b808bbdd58f08f0aa4ab471 Author: Andrew Wong <[email protected]> AuthorDate: Mon Mar 2 21:18:41 2020 -0800 subprocess: plumb Java metrics into C++ This plumbs metrics from the Java subprocess into C++ and encapsulates common code used to interact with a SubprocessServer bits (including the metrics parsing) into the new SubprocessProxy template class. This template is specialized for Echo{Request,Response}PB messages as the new test-only EchoSubprocess, and adds echo-specific histogram metrics based on those returned by the Echo Java subprocess. Change-Id: I7260ea13717dfd4af0138f77dfb6e5d239b3bee2 Reviewed-on: http://gerrit.cloudera.org:8080/15344 Tested-by: Kudu Jenkins Reviewed-by: Hao Hao <[email protected]> Reviewed-by: Adar Dembo <[email protected]> --- src/kudu/subprocess/CMakeLists.txt | 1 + src/kudu/subprocess/subprocess_proxy-test.cc | 167 +++++++++++++++++++++++++++ src/kudu/subprocess/subprocess_proxy.h | 102 ++++++++++++++++ 3 files changed, 270 insertions(+) diff --git a/src/kudu/subprocess/CMakeLists.txt b/src/kudu/subprocess/CMakeLists.txt index 62f277c..8f284f5 100644 --- a/src/kudu/subprocess/CMakeLists.txt +++ b/src/kudu/subprocess/CMakeLists.txt @@ -72,4 +72,5 @@ if (NOT NO_TESTS) ) endif() +ADD_KUDU_TEST(subprocess_proxy-test) ADD_KUDU_TEST(subprocess_server-test) diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc new file mode 100644 index 0000000..6d26268 --- /dev/null +++ b/src/kudu/subprocess/subprocess_proxy-test.cc @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/subprocess/subprocess_proxy.h" + +#include <cstdint> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include <gtest/gtest.h> + +#include "kudu/gutil/casts.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/subprocess/subprocess.pb.h" +#include "kudu/util/env.h" +#include "kudu/util/metrics.h" +#include "kudu/util/path_util.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using std::make_shared; +using std::shared_ptr; +using std::string; +using std::vector; +using strings::Substitute; + +METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_length, + "Echo subprocess inbound queue length", + kudu::MetricUnit::kMessages, + "Number of request messages in the Echo subprocess' inbound request queue", + kudu::MetricLevel::kInfo, + 1000, 1); +METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length, + "Echo subprocess outbound queue length", + kudu::MetricUnit::kMessages, + "Number of request messages in the Echo subprocess' outbound response queue", + kudu::MetricLevel::kInfo, + 1000, 1); +METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_time_ms, + "Echo subprocess inbound queue time (ms)", + kudu::MetricUnit::kMilliseconds, + "Duration of time in ms spent in the Echo subprocess' inbound request queue", + kudu::MetricLevel::kInfo, + 60000LU, 1); +METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_time_ms, + "Echo subprocess outbound queue time (ms)", + kudu::MetricUnit::kMilliseconds, + "Duration of time in ms spent in the Echo subprocess' outbound response queue", + kudu::MetricLevel::kInfo, + 60000LU, 1); +METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms, + "Echo subprocess execution time (ms)", + kudu::MetricUnit::kMilliseconds, + "Duration of time in ms spent executing the Echo subprocess request, excluding " + "time spent spent in the subprocess queues", + kudu::MetricLevel::kInfo, + 60000LU, 1); + + +namespace kudu { +namespace subprocess { + + +#define GINIT(member, x) member = METRIC_##x.Instantiate(entity, 0) +#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity) +struct EchoSubprocessMetrics : public SubprocessMetrics { + explicit EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) { + HISTINIT(inbound_queue_length, echo_subprocess_inbound_queue_length); + HISTINIT(outbound_queue_length, echo_subprocess_outbound_queue_length); + HISTINIT(inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms); + HISTINIT(outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms); + HISTINIT(execution_time_ms, echo_subprocess_execution_time_ms); + } +}; +#undef HISTINIT +#undef MINIT + +typedef SubprocessProxy<EchoRequestPB, EchoResponsePB, EchoSubprocessMetrics> EchoSubprocess; + +class EchoSubprocessTest : public KuduTest { + public: + EchoSubprocessTest() + : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, + "subprocess_proxy-test")) {} + + void SetUp() override { + KuduTest::SetUp(); + ASSERT_OK(ResetEchoSubprocess()); + } + + Status ResetEchoSubprocess() { + string exe; + RETURN_NOT_OK(env_->GetExecutablePath(&exe)); + const string bin_dir = DirName(exe); + string java_home; + RETURN_NOT_OK(FindHomeDir("java", bin_dir, &java_home)); + vector<string> argv = { + Substitute("$0/bin/java", java_home), + "-jar", Substitute("$0/kudu-subprocess-echo.jar", bin_dir) + }; + echo_subprocess_ = make_shared<EchoSubprocess>(std::move(argv), metric_entity_); + return echo_subprocess_->Start(); + } + + protected: + MetricRegistry metric_registry_; + scoped_refptr<MetricEntity> metric_entity_; + shared_ptr<EchoSubprocess> echo_subprocess_; +}; + +TEST_F(EchoSubprocessTest, TestBasicMetrics) { + const string kMessage = "don't catch you slippin' now"; + const int64_t kSleepMs = 1000; + EchoRequestPB req; + req.set_data(kMessage); + req.set_sleep_ms(kSleepMs); + EchoResponsePB resp; + ASSERT_OK(echo_subprocess_->Execute(req, &resp)); + ASSERT_EQ(kMessage, resp.data()); + + // There shouldn't have anything in the subprocess queues. + Histogram* in_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull( + METRIC_echo_subprocess_inbound_queue_length).get()); + ASSERT_EQ(1, in_len_hist->TotalCount()); + ASSERT_EQ(0, in_len_hist->MaxValueForTests()); + Histogram* out_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull( + METRIC_echo_subprocess_outbound_queue_length).get()); + ASSERT_EQ(1, out_len_hist->TotalCount()); + ASSERT_EQ(0, out_len_hist->MaxValueForTests()); + + // We should have some non-negative queue times. + Histogram* out_hist = down_cast<Histogram*>(metric_entity_->FindOrNull( + METRIC_echo_subprocess_outbound_queue_time_ms).get()); + ASSERT_EQ(1, out_hist->TotalCount()); + ASSERT_LE(0, out_hist->MaxValueForTests()); + Histogram* in_hist = down_cast<Histogram*>(metric_entity_->FindOrNull( + METRIC_echo_subprocess_inbound_queue_time_ms).get()); + ASSERT_EQ(1, in_hist->TotalCount()); + ASSERT_LE(0, in_hist->MaxValueForTests()); + + // The execution should've taken at least our sleep time. + Histogram* exec_hist = down_cast<Histogram*>(metric_entity_->FindOrNull( + METRIC_echo_subprocess_execution_time_ms).get()); + ASSERT_EQ(1, exec_hist->TotalCount()); + ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests()); +} + +} // namespace subprocess +} // namespace kudu diff --git a/src/kudu/subprocess/subprocess_proxy.h b/src/kudu/subprocess/subprocess_proxy.h new file mode 100644 index 0000000..3d60311 --- /dev/null +++ b/src/kudu/subprocess/subprocess_proxy.h @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <vector> +#include <string> + +#include <glog/logging.h> + +#include "kudu/common/wire_protocol.h" +#include "kudu/subprocess/server.h" +#include "kudu/subprocess/subprocess.pb.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/metrics.h" +#include "kudu/util/pb_util.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace subprocess { + +// TODO(awong): add server metrics. +struct SubprocessMetrics { + scoped_refptr<Histogram> inbound_queue_length; + scoped_refptr<Histogram> outbound_queue_length; + scoped_refptr<Histogram> inbound_queue_time_ms; + scoped_refptr<Histogram> outbound_queue_time_ms; + scoped_refptr<Histogram> execution_time_ms; +}; + +// Template that wraps a SubprocessServer, exposing only the underlying ReqPB +// and RespPB as an interface. The given MetricsPB will be initialized, +// allowing for metrics specific to each specialized SubprocessServer. +template<class ReqPB, class RespPB, class MetricsPB> +class SubprocessProxy { + public: + SubprocessProxy(std::vector<std::string> argv, const scoped_refptr<MetricEntity>& entity) + : server_(std::move(argv)), metrics_(entity) {} + + // Starts the underlying subprocess. + Status Start() { + return server_.Init(); + } + + // Executes the given request and populates the given response, returning a + // non-OK Status if there was an error sending the request (e.g. timed out) + // or if there was an error in the response. + Status Execute(const ReqPB& req, RespPB* resp) { + SubprocessRequestPB sreq; + sreq.mutable_request()->PackFrom(req); + SubprocessResponsePB sresp; + RETURN_NOT_OK(server_.Execute(&sreq, &sresp)); + if (!sresp.response().UnpackTo(resp)) { + LOG(ERROR) << strings::Substitute("unable to unpack response: $0", + pb_util::SecureDebugString(sresp)); + return Status::Corruption("unable to unpack response"); + } + // The subprocess metrics should still be valid regardless of whether there + // was an error, so parse them first. + if (sresp.has_metrics()) { + ParseMetricsPB(sresp.metrics()); + } + if (sresp.has_error()) { + return StatusFromPB(sresp.error()); + } + return Status::OK(); + } + private: + // Parses the given metrics protobuf and updates 'metrics_' based on its + // contents. + void ParseMetricsPB(const SubprocessMetricsPB& pb) { + DCHECK(pb.has_inbound_queue_length()); + DCHECK(pb.has_outbound_queue_length()); + DCHECK(pb.has_inbound_queue_time_ms()); + DCHECK(pb.has_outbound_queue_time_ms()); + DCHECK(pb.has_execution_time_ms()); + metrics_.inbound_queue_length->Increment(pb.inbound_queue_length()); + metrics_.outbound_queue_length->Increment(pb.outbound_queue_length()); + metrics_.inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms()); + metrics_.outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms()); + metrics_.execution_time_ms->Increment(pb.execution_time_ms()); + } + + SubprocessServer server_; + MetricsPB metrics_; +}; + +} // namespace subprocess +} // namespace kudu
