This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 0ddcaaabc [client] KUDU-3351: Add ResourceMetricsPB into
WriteResponsePB
0ddcaaabc is described below
commit 0ddcaaabc97c85a4715ae79ff5604feb9b342779
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Apr 26 14:35:19 2022 -0700
[client] KUDU-3351: Add ResourceMetricsPB into WriteResponsePB
KUDU-1563 adds support for INSERT_IGNORE, UPDATE_IGNORE, and
DELETE_IGNORE. However, it was lacking the per-session metrics about how
many rows get ignored vs modified. This patch implements the per-session
metrics by introducing a new ResourceMetricsPB field into the
WriteResponsePB that's populated in every response sent back to the
client.
Change-Id: I9adefd64b0058c66274a00e1b12334653fcab2b3
Reviewed-on: http://gerrit.cloudera.org:8080/18451
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/client/batcher.cc | 5 ++++
src/kudu/client/client-test.cc | 33 ++++++++++++++++++++++
src/kudu/client/client.cc | 4 +++
src/kudu/client/client.h | 3 ++
src/kudu/client/resource_metrics.h | 1 +
src/kudu/client/session-internal.cc | 23 ++++++++++++++-
src/kudu/client/session-internal.h | 11 ++++++++
.../integration-tests/exactly_once_writes-itest.cc | 8 ++++++
src/kudu/tablet/ops/write_op.cc | 23 ++++++++++++++-
src/kudu/tablet/ops/write_op.h | 5 ++++
src/kudu/tserver/tserver.proto | 24 ++++++++++++++++
11 files changed, 138 insertions(+), 2 deletions(-)
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 2db5033b0..cfde2eca3 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -966,6 +966,11 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
had_errors_ = true;
}
+ // Collect metrics
+ if (sp::shared_ptr<KuduSession> session = weak_session_.lock()) {
+ session->data_->UpdateWriteOpMetrics(rpc.resp().resource_metrics());
+ }
+
// Remove all the ops from the "in-flight" list. It's essential to do so
// _after_ adding all errors into the collector, otherwise there might be
// a race which manifests itself as described at KUDU-1743. Essentially,
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index a7c7e9835..8543fbdc2 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -2936,6 +2936,24 @@ static void DoTestVerifyRows(const
shared_ptr<KuduTable>& tbl, int num_rows) {
}
}
+static void DoVerifyMetrics(const KuduSession* session,
+ int64_t successful_inserts,
+ int64_t insert_ignore_errors,
+ int64_t successful_upserts,
+ int64_t successful_updates,
+ int64_t update_ignore_errors,
+ int64_t successful_deletes,
+ int64_t delete_ignore_errors) {
+ auto metrics = session->GetWriteOpMetrics().Get();
+ ASSERT_EQ(successful_inserts, metrics["successful_inserts"]);
+ ASSERT_EQ(insert_ignore_errors, metrics["insert_ignore_errors"]);
+ ASSERT_EQ(successful_upserts, metrics["successful_upserts"]);
+ ASSERT_EQ(successful_updates, metrics["successful_updates"]);
+ ASSERT_EQ(update_ignore_errors, metrics["update_ignore_errors"]);
+ ASSERT_EQ(successful_deletes, metrics["successful_deletes"]);
+ ASSERT_EQ(delete_ignore_errors, metrics["delete_ignore_errors"]);
+}
+
TEST_F(ClientTest, TestInsertIgnore) {
shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(10000);
@@ -2945,6 +2963,7 @@ TEST_F(ClientTest, TestInsertIgnore) {
unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
ASSERT_OK(session->Apply(insert.release()));
DoTestVerifyRows(client_table_, 1);
+ DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0);
}
{
@@ -2952,6 +2971,7 @@ TEST_F(ClientTest, TestInsertIgnore) {
unique_ptr<KuduInsertIgnore>
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 1));
ASSERT_OK(session->Apply(insert_ignore.release()));
DoTestVerifyRows(client_table_, 1);
+ DoVerifyMetrics(session.get(), 1, 1, 0, 0, 0, 0, 0);
}
{
@@ -2963,6 +2983,7 @@ TEST_F(ClientTest, TestInsertIgnore) {
ASSERT_OK(insert_ignore->mutable_row()->SetInt32("non_null_with_default",
999));
ASSERT_OK(session->Apply(insert_ignore.release())); // returns ok but
results in no change
DoTestVerifyRows(client_table_, 1);
+ DoVerifyMetrics(session.get(), 1, 2, 0, 0, 0, 0, 0);
}
{
@@ -2970,6 +2991,7 @@ TEST_F(ClientTest, TestInsertIgnore) {
unique_ptr<KuduInsertIgnore>
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 2));
ASSERT_OK(session->Apply(insert_ignore.release()));
DoTestVerifyRows(client_table_, 2);
+ DoVerifyMetrics(session.get(), 2, 2, 0, 0, 0, 0, 0);
}
}
@@ -2983,12 +3005,14 @@ TEST_F(ClientTest, TestUpdateIgnore) {
unique_ptr<KuduUpdateIgnore>
update_ignore(BuildTestUpdateIgnore(client_table_.get(), 1));
ASSERT_OK(session->Apply(update_ignore.release()));
DoTestVerifyRows(client_table_, 0);
+ DoVerifyMetrics(session.get(), 0, 0, 0, 0, 1, 0, 0);
}
{
unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
ASSERT_OK(session->Apply(insert.release()));
DoTestVerifyRows(client_table_, 1);
+ DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0);
}
{
@@ -2999,6 +3023,7 @@ TEST_F(ClientTest, TestUpdateIgnore) {
ASSERT_OK(update_ignore->mutable_row()->SetStringCopy("string_val", "hello
world"));
ASSERT_OK(update_ignore->mutable_row()->SetInt32("non_null_with_default",
999));
ASSERT_OK(session->Apply(update_ignore.release()));
+ DoVerifyMetrics(session.get(), 1, 0, 0, 1, 1, 0, 0);
vector<string> rows;
KuduScanner scanner(client_table_.get());
@@ -3018,6 +3043,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
ASSERT_OK(session->Apply(insert.release()));
DoTestVerifyRows(client_table_, 1);
+ DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0);
}
{
@@ -3025,6 +3051,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
unique_ptr<KuduDeleteIgnore>
delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1));
ASSERT_OK(session->Apply(delete_ignore.release()));
DoTestVerifyRows(client_table_, 0);
+ DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 0);
}
{
@@ -3032,6 +3059,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
unique_ptr<KuduDeleteIgnore>
delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1));
ASSERT_OK(session->Apply(delete_ignore.release()));
DoTestVerifyRows(client_table_, 0);
+ DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 1);
}
}
@@ -4438,6 +4466,7 @@ TEST_F(ClientTest, TestUpsert) {
// Perform and verify UPSERT which acts as an INSERT.
ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 1, "original
row"));
FlushSessionOrDie(session);
+ DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0);
{
vector<string> rows;
@@ -4450,6 +4479,7 @@ TEST_F(ClientTest, TestUpsert) {
// Perform and verify UPSERT which acts as an UPDATE.
ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 2, "upserted
row"));
FlushSessionOrDie(session);
+ DoVerifyMetrics(session.get(), 0, 0, 2, 0, 0, 0, 0);
{
vector<string> rows;
@@ -4468,6 +4498,7 @@ TEST_F(ClientTest, TestUpsert) {
ASSERT_OK(row->SetInt32("non_null_with_default", 999));
ASSERT_OK(session->Apply(update.release()));
FlushSessionOrDie(session);
+ DoVerifyMetrics(session.get(), 0, 0, 2, 1, 0, 0, 0);
}
{
vector<string> rows;
@@ -4481,6 +4512,7 @@ TEST_F(ClientTest, TestUpsert) {
// column, and therefore should not revert it back to its default.
ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 3, "upserted
row 2"));
FlushSessionOrDie(session);
+ DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 0, 0);
{
vector<string> rows;
ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
@@ -4492,6 +4524,7 @@ TEST_F(ClientTest, TestUpsert) {
// Delete the row.
ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1));
FlushSessionOrDie(session);
+ DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 1, 0);
{
vector<string> rows;
ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 2e37713c8..61349527a 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1447,6 +1447,10 @@ KuduClient* KuduSession::client() const {
return data_->client_.get();
}
+const ResourceMetrics& KuduSession::GetWriteOpMetrics() const {
+ return data_->write_op_metrics_;
+}
+
////////////////////////////////////////////////////////////
// KuduTableAlterer
////////////////////////////////////////////////////////////
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 35dab86e2..bf94a7b26 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -2566,6 +2566,9 @@ class KUDU_EXPORT KuduSession : public
sp::enable_shared_from_this<KuduSession>
/// @return Client for the session: pointer to the associated client object.
KuduClient* client() const;
+ /// @return Cumulative write operation metrics since the beginning of the
session.
+ const ResourceMetrics& GetWriteOpMetrics() const;
+
private:
class KUDU_NO_EXPORT Data;
diff --git a/src/kudu/client/resource_metrics.h
b/src/kudu/client/resource_metrics.h
index 015087dab..dff31fedc 100644
--- a/src/kudu/client/resource_metrics.h
+++ b/src/kudu/client/resource_metrics.h
@@ -65,6 +65,7 @@ class KUDU_EXPORT ResourceMetrics {
private:
friend class KuduScanner;
+ friend class KuduSession;
class KUDU_NO_EXPORT Data;
Data* data_;
};
diff --git a/src/kudu/client/session-internal.cc
b/src/kudu/client/session-internal.cc
index 320ccac18..0cb0be561 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -22,29 +22,37 @@
#include <utility>
#include <glog/logging.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/message.h>
#include "kudu/client/batcher.h"
#include "kudu/client/callbacks.h"
#include "kudu/client/error_collector.h"
+#include "kudu/client/resource_metrics-internal.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
+#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/logging.h"
-
+using google::protobuf::FieldDescriptor;
+using google::protobuf::Reflection;
using kudu::client::internal::Batcher;
using kudu::client::internal::ErrorCollector;
using kudu::client::sp::shared_ptr;
using kudu::client::sp::weak_ptr;
using kudu::rpc::Messenger;
+using kudu::tserver::ResourceMetricsPB;
using std::unique_ptr;
using strings::Substitute;
namespace kudu {
+
namespace client {
KuduSession::Data::Data(shared_ptr<KuduClient> client,
@@ -577,5 +585,18 @@ size_t KuduSession::Data::GetBatchersCountForTests() const
{
return batchers_num_;
}
+void KuduSession::Data::UpdateWriteOpMetrics(const ResourceMetricsPB&
resource_metrics) {
+ const auto* reflection = resource_metrics.GetReflection();
+ const auto* desc = resource_metrics.GetDescriptor();
+ for (int i = 0; i < desc->field_count(); i++) {
+ const FieldDescriptor* field = desc->field(i);
+ if (reflection->HasField(resource_metrics, field) &&
+ field->cpp_type() == FieldDescriptor::CPPTYPE_INT64) {
+ write_op_metrics_.data_->Increment(StringPiece(field->name()),
+
reflection->GetInt64(resource_metrics, field));
+ }
+ }
+}
+
} // namespace client
} // namespace kudu
diff --git a/src/kudu/client/session-internal.h
b/src/kudu/client/session-internal.h
index db9de7b83..66962dd9a 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -25,6 +25,7 @@
#include "kudu/client/batcher.h"
#include "kudu/client/client.h"
#include "kudu/client/error_collector.h"
+#include "kudu/client/resource_metrics.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/txn_id.h"
#include "kudu/gutil/macros.h"
@@ -40,6 +41,10 @@ namespace rpc {
class Messenger;
} // namespace rpc
+namespace tserver {
+class ResourceMetricsPB;
+} // namespace tserver
+
namespace client {
class KuduStatusCallback;
@@ -160,6 +165,9 @@ class KuduSession::Data {
// primary key and perform other validations with regard to the column
schema.
Status ValidateWriteOperation(KuduWriteOperation* op) const;
+ // Adds given write operation metrics into session's total write operation
metrics.
+ void UpdateWriteOpMetrics(const tserver::ResourceMetricsPB& metrics);
+
// This constant represents a meaningful name for the first argument in
// expressions like FlushCurrentBatcher(1, cbk): this is the watermark
// corresponding to 1 byte of data. This watermark level is the minimum
@@ -246,6 +254,9 @@ class KuduSession::Data {
// returns true only if the upper-level session is a transactional one.
const TxnId txn_id_;
+ // Metrics of all write operations in the session.
+ ResourceMetrics write_op_metrics_;
+
private:
FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc
b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index 51a43c209..9dece4513 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -28,6 +28,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/common/row_operations.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/common/wire_protocol.h"
@@ -174,6 +175,13 @@ void
ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(Sockaddr address,
}
// If there was no error, store the response.
if (status.ok()) {
+ if (response.has_resource_metrics()) {
+ // Release resource_metrics because it is not relevant to this test.
+ // We are asserting that all responses are equal, but in case of
exactly-once
+ // RPC semantics, metrics in retried requests may all come zeroed
out or even
+ // not populated.
+ response.clear_resource_metrics();
+ }
responses->push_back(response);
break;
}
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index b5ae1298b..099e2b782 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -81,11 +81,12 @@ using strings::Substitute;
namespace kudu {
namespace tablet {
-using pb_util::SecureShortDebugString;
using consensus::CommitMsg;
using consensus::DriverType;
using consensus::ReplicateMsg;
using consensus::WRITE_OP;
+using pb_util::SecureShortDebugString;
+using tserver::ResourceMetricsPB;
using tserver::TabletServerErrorPB;
using tserver::WriteRequestPB;
using tserver::WriteResponsePB;
@@ -290,6 +291,11 @@ Status WriteOp::Apply(CommitMsg** commit_msg) {
void WriteOp::Finish(OpResult result) {
TRACE_EVENT0("op", "WriteOp::Finish");
+ if (result == Op::APPLIED) {
+ // Populate response metrics.
+ state()->FillResponseMetrics(type());
+ }
+
state()->FinishApplyingOrAbort(result);
if (PREDICT_FALSE(result == Op::ABORTED)) {
@@ -655,5 +661,20 @@ string WriteOpState::ToString() const {
row_ops_str);
}
+void WriteOpState::FillResponseMetrics(consensus::DriverType type) {
+ const auto& op_m = op_metrics_;
+ tserver::ResourceMetricsPB* resp_metrics =
response_->mutable_resource_metrics();
+ resp_metrics->set_successful_inserts(op_m.successful_inserts);
+ resp_metrics->set_insert_ignore_errors(op_m.insert_ignore_errors);
+ resp_metrics->set_successful_upserts(op_m.successful_upserts);
+ resp_metrics->set_successful_updates(op_m.successful_updates);
+ resp_metrics->set_update_ignore_errors(op_m.update_ignore_errors);
+ resp_metrics->set_successful_deletes(op_m.successful_deletes);
+ resp_metrics->set_delete_ignore_errors(op_m.delete_ignore_errors);
+ if (type == consensus::LEADER && external_consistency_mode() == COMMIT_WAIT)
{
+
resp_metrics->set_commit_wait_duration_usec(op_m.commit_wait_duration_usec);
+ }
+}
+
} // namespace tablet
} // namespace kudu
diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h
index ea618873f..5aa11dec7 100644
--- a/src/kudu/tablet/ops/write_op.h
+++ b/src/kudu/tablet/ops/write_op.h
@@ -261,6 +261,11 @@ class WriteOpState : public OpState {
// the partition lock.
void TransferOrReleasePartitionLock();
+ // Copy metrics from 'op_metrics_' into the response's 'resource_metrics'.
+ // Should only be called before FinishApplyingOrAbort() to make sure that
'response_'
+ // has not been released.
+ void FillResponseMetrics(consensus::DriverType type);
+
private:
// Releases all the row locks acquired by this op.
void ReleaseRowLocks();
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 82616b0f8..a6145b63b 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -185,6 +185,14 @@ message WriteResponsePB {
// The timestamp chosen by the server for this write.
// TODO KUDU-611 propagate timestamps with server signature.
optional fixed64 timestamp = 3;
+
+ // The write operation metrics of this RPC.
+ // This metrics contains the number of successful/unsuccessful operation
+ // ('successful_inserts' to 'delete_ignore_errors' fields in
ResourceMetricsPB message)
+ // related to the associated write request.
+ // Additionally, contains 'commit_wait_duration_usec' metric if the
responding server is
+ // a LEADER and the request external_consistency_mode is COMMIT_WAIT.
+ optional ResourceMetricsPB resource_metrics = 4;
}
// A list tablets request
@@ -404,6 +412,22 @@ message ResourceMetricsPB {
optional int64 cpu_user_nanos = 6;
// Total elapsed CPU system time in nanoseconds for all scan rpc requests
for this scanner.
optional int64 cpu_system_nanos = 7;
+ // Total number of successful INSERT/INSERT_IGNORE operation.
+ optional int64 successful_inserts = 8;
+ // Total number of unsuccessful INSERT_IGNORE operation.
+ optional int64 insert_ignore_errors = 9;
+ // Total number of successful UPSERT operation.
+ optional int64 successful_upserts = 10;
+ // Total number of successful UPDATE/UPDATE_IGNORE operation.
+ optional int64 successful_updates = 11;
+ // Total number of unsuccessful UPDATE_IGNORE operation.
+ optional int64 update_ignore_errors = 12;
+ // Total number of successful DELETE operation.
+ optional int64 successful_deletes = 13;
+ // Total number of unsuccessful DELETE_IGNORE operation.
+ optional int64 delete_ignore_errors = 14;
+ // Total observed commit wait duration in microseconds.
+ optional int64 commit_wait_duration_usec = 15;
}
message ScanResponsePB {