This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new e64eb7c KUDU-2612: C++ client sets txn_id in WriteRequestPB
e64eb7c is described below
commit e64eb7c7ceceec76aeb5cceac9dc42cc0e78f1ec
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Oct 20 16:18:10 2020 -0700
KUDU-2612: C++ client sets txn_id in WriteRequestPB
This patch introduces an optional 'txn_id' field into WriteRequestPB
protobuf message and changes Kudu C++ client to populate it for
all requests sent out from a transactional KuduSession. In addition,
this patch contains unit-level test to verify that KuduSession and
Batcher have their txn_id_ fields set correspondingly. It's assumed
that follow-up changelists will add more comprehensive end-to-end
coverage once transactional API for Kudu client is introduced and
tablet servers process the WriteRequestPB:txn_id field as prescribed
by the design document [1].
A follow-up changelist will introduce corresponding client API changes
for transaction-related operations, and with those it will be possible
to begin, commit, and rollback a transaction. However, I think it's
important to highlight a few assumptions that Andrew and I discussed
offline: this patch assumes that a single KuduSession isn't allowed
to have a mix of transactional and non-transactional write operations.
Also, all write operations handled by a KuduSession instance can be
attributed only to a single (the same) transaction. In other words,
it's assumed that a separate KuduSession instances should be created
to handle operations pertaining to different transactions. This
restriction doesn't seem to be too harsh, but it helps to avoid a
complicated dance in handling already accumulated write operation in
KuduClient. Otherwise, it would be necessary to flush buffered
operations if switching from non-transactional writes to transactional
ones and back. If it turns out that the functionality of mixing
transactional and non-transactional write operations is necessary,
this restriction can be removed: it's feasible to add transaction
control operations into KuduSession in future (i.e.
{Begin,Commit,Abort}Transaction() methods), but it will entail adding
more complexity into already convoluted client-side code of handling
buffered write operations.
For now, I decided not to expose txn_id via the public client API.
Also, I specifically avoided exposing txn_id via KuduWriteOperation as
well, keeping it a private member of internal classes KuduSession::Data
and Batcher.
[1] https://s.apache.org/kudu-multi-row-transaction-design
Change-Id: Ib60cb0ea8066e2c6417ebe4b2a24aff3512b44f1
Reviewed-on: http://gerrit.cloudera.org:8080/16625
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
Reviewed-by: Hao Hao <[email protected]>
---
src/kudu/client/batcher.cc | 10 +++++++-
src/kudu/client/batcher.h | 19 ++++++++++++--
src/kudu/client/client-test.cc | 50 +++++++++++++++++++++++++++++++++++++
src/kudu/client/client.cc | 7 +++++-
src/kudu/client/client.h | 3 +++
src/kudu/client/session-internal.cc | 22 ++++++++--------
src/kudu/client/session-internal.h | 14 +++++++----
src/kudu/tserver/tserver.proto | 3 +++
8 files changed, 107 insertions(+), 21 deletions(-)
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index abe0dd6..6fbb78d 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -42,6 +42,7 @@
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/row_operations.h"
+#include "kudu/common/txn_id.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/atomic_refcount.h"
@@ -80,6 +81,7 @@ using kudu::security::SignedTokenPB;
using kudu::tserver::WriteRequestPB;
using kudu::tserver::WriteResponsePB;
using kudu::tserver::WriteResponsePB_PerRowErrorPB;
+using kudu::TxnId;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
@@ -296,6 +298,10 @@ WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
req_.set_propagated_timestamp(propagated_timestamp);
}
+ if (batcher->txn_id().IsValid()) {
+ req_.set_txn_id(batcher->txn_id());
+ }
+
// Set up schema
CHECK_OK(SchemaToPB(*schema, req_.mutable_schema(),
SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES |
@@ -580,11 +586,13 @@ void WriteRpc::GotNewAuthzTokenRetryCb(const Status&
status) {
Batcher::Batcher(KuduClient* client,
scoped_refptr<ErrorCollector> error_collector,
sp::weak_ptr<KuduSession> session,
- kudu::client::KuduSession::ExternalConsistencyMode
consistency_mode)
+ kudu::client::KuduSession::ExternalConsistencyMode
consistency_mode,
+ const TxnId& txn_id)
: state_(kGatheringOps),
client_(client),
weak_session_(std::move(session)),
consistency_mode_(consistency_mode),
+ txn_id_(txn_id),
error_collector_(std::move(error_collector)),
had_errors_(false),
flush_callback_(nullptr),
diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h
index f11148e..3874705 100644
--- a/src/kudu/client/batcher.h
+++ b/src/kudu/client/batcher.h
@@ -27,6 +27,7 @@
#include "kudu/client/client.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
+#include "kudu/common/txn_id.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
@@ -70,7 +71,8 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
Batcher(KuduClient* client,
scoped_refptr<ErrorCollector> error_collector,
client::sp::weak_ptr<KuduSession> session,
- kudu::client::KuduSession::ExternalConsistencyMode consistency_mode);
+ kudu::client::KuduSession::ExternalConsistencyMode consistency_mode,
+ const kudu::TxnId& txn_id);
// Abort the current batch. Any writes that were buffered and not yet sent
are
// discarded. Those that were sent may still be delivered. If there is a
pending Flush
@@ -125,6 +127,12 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
return buffer_bytes_used_.Load();
}
+ // Return the identifier of a multi-row transaction (if any) that all the
+ // accumulated write operations are part of.
+ const TxnId& txn_id() const {
+ return txn_id_;
+ }
+
// Compute in-buffer size for the given write operation.
static int64_t GetOperationSizeInBuffer(KuduWriteOperation* write_op) {
return write_op->SizeInBuffer();
@@ -185,7 +193,14 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
client::sp::weak_ptr<KuduSession> weak_session_;
// The consistency mode set in the session.
- kudu::client::KuduSession::ExternalConsistencyMode consistency_mode_;
+ const kudu::client::KuduSession::ExternalConsistencyMode consistency_mode_;
+
+ // The identifier of a transaction that is associated with operations
+ // processed by this batcher. All operations accumulated by one batcher
either
+ // related to the same multi-row transaction or all of the accumulated
+ // operations are not a part of any multi-row transaction. In the latter
case,
+ // txn_id_.IsValid() would return 'false'.
+ const TxnId txn_id_;
// Errors are reported into this error collector.
scoped_refptr<ErrorCollector> error_collector_;
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index ca89a7a..8f95acc 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -45,6 +45,7 @@
#include <google/protobuf/util/message_differencer.h>
#include <gtest/gtest.h>
+#include "kudu/client/batcher.h"
#include "kudu/client/callbacks.h"
#include "kudu/client/client-internal.h"
#include "kudu/client/client-test-util.h"
@@ -70,6 +71,7 @@
#include "kudu/common/row.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
+#include "kudu/common/txn_id.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/casts.h"
@@ -6440,6 +6442,54 @@ TEST_F(ClientTest, TestRetrieveAuthzTokenInParallel) {
ASSERT_LT(num_reqs, kThreads);
}
+// This is test verifies that txn_id is properly set in a transactional session
+// and its current batcher. This is a unit-level test scenario.
+TEST_F(ClientTest, TxnIdOfTransactionalSession) {
+ const auto apply_single_insert = [this] (KuduSession* s) {
+ ASSERT_OK(s->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ unique_ptr<KuduInsert> insert(client_table_->NewInsert());
+ ASSERT_OK(insert->mutable_row()->SetInt32("key", 0));
+ ASSERT_OK(insert->mutable_row()->SetInt32("int_val", 0));
+ ASSERT_OK(s->Apply(insert.release()));
+ };
+
+ // Check how relevant member fields are populated in case of
+ // non-transactional session.
+ {
+ KuduSession s(client_);
+
+ const auto& session_data_txn_id = s.data_->txn_id_;
+ ASSERT_FALSE(session_data_txn_id.IsValid());
+
+ NO_FATALS(apply_single_insert(&s));
+
+ // Make sure current batcher has txn_id_ set to an non-valid transaction
+ // identifier.
+ ASSERT_NE(nullptr, s.data_->batcher_.get());
+ const auto& batcher_txn_id = s.data_->batcher_->txn_id();
+ ASSERT_FALSE(batcher_txn_id.IsValid());
+ }
+
+ // Check how relevant member fields are populated in case of
+ // transactional session.
+ {
+ const TxnId kTxnId(0);
+ KuduSession s(client_, kTxnId);
+
+ const auto& session_data_txn_id = s.data_->txn_id_;
+ ASSERT_TRUE(session_data_txn_id.IsValid());
+ ASSERT_EQ(kTxnId.value(), session_data_txn_id.value());
+
+ NO_FATALS(apply_single_insert(&s));
+
+ // Make sure current batcher has txn_id_ member properly set.
+ ASSERT_NE(nullptr, s.data_->batcher_.get());
+ const auto& batcher_txn_id = s.data_->batcher_->txn_id();
+ ASSERT_TRUE(batcher_txn_id.IsValid());
+ ASSERT_EQ(kTxnId.value(), batcher_txn_id.value());
+ }
+}
+
// This test verifies that rows with column schema violations such as
// unset non-nullable columns (with no default value) are detected at the
client
// side while calling Apply() for corresponding write operations. So, that sort
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 16819ac..6c6bce4 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1154,7 +1154,12 @@ KuduError::~KuduError() {
////////////////////////////////////////////////////////////
KuduSession::KuduSession(const shared_ptr<KuduClient>& client)
- : data_(new KuduSession::Data(client, client->data_->messenger_)) {
+ : data_(new KuduSession::Data(client, client->data_->messenger_)) {
+}
+
+KuduSession::KuduSession(const shared_ptr<KuduClient>& client,
+ const TxnId& txn_id)
+ : data_(new KuduSession::Data(client, client->data_->messenger_, txn_id)) {
}
KuduSession::~KuduSession() {
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 92addfd..aa2a207 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -60,6 +60,7 @@ class MonoDelta;
class Partition;
class PartitionSchema;
class SecurityUnknownTskTest;
+class TxnId;
namespace client {
class KuduClient;
@@ -2039,8 +2040,10 @@ class KUDU_EXPORT KuduSession : public
sp::enable_shared_from_this<KuduSession>
friend class ClientTest;
FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
+ FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
explicit KuduSession(const sp::shared_ptr<KuduClient>& client);
+ KuduSession(const sp::shared_ptr<KuduClient>& client, const TxnId& txn_id);
// Owned.
Data* data_;
diff --git a/src/kudu/client/session-internal.cc
b/src/kudu/client/session-internal.cc
index cc967e5..5baa4ed 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -35,24 +35,21 @@
#include "kudu/rpc/messenger.h"
#include "kudu/util/logging.h"
+
+using kudu::client::internal::Batcher;
+using kudu::client::internal::ErrorCollector;
+using kudu::client::sp::shared_ptr;
+using kudu::client::sp::weak_ptr;
+using kudu::rpc::Messenger;
using std::unique_ptr;
using strings::Substitute;
namespace kudu {
-
-using rpc::Messenger;
-
namespace client {
-using internal::Batcher;
-using internal::ErrorCollector;
-
-using sp::shared_ptr;
-using sp::weak_ptr;
-
-
KuduSession::Data::Data(shared_ptr<KuduClient> client,
- std::weak_ptr<rpc::Messenger> messenger)
+ std::weak_ptr<rpc::Messenger> messenger,
+ const TxnId& txn_id)
: client_(std::move(client)),
messenger_(std::move(messenger)),
error_collector_(new ErrorCollector()),
@@ -66,6 +63,7 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client,
buffer_bytes_limit_(7 * 1024 * 1024),
buffer_watermark_pct_(50),
buffer_bytes_used_(0),
+ txn_id_(txn_id),
buffer_pre_flush_enabled_(true) {
}
@@ -475,7 +473,7 @@ Status KuduSession::Data::ApplyWriteOp(KuduWriteOperation*
write_op) {
// no thread-safety is advertised for the kudu::KuduSession interface.
scoped_refptr<Batcher> batcher(
new Batcher(client_.get(), error_collector_, session_,
- external_consistency_mode_));
+ external_consistency_mode_, txn_id_));
if (timeout_.Initialized()) {
batcher->SetTimeout(timeout_);
}
diff --git a/src/kudu/client/session-internal.h
b/src/kudu/client/session-internal.h
index e529f7d..db9de7b 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -14,8 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_CLIENT_SESSION_INTERNAL_H
-#define KUDU_CLIENT_SESSION_INTERNAL_H
+#pragma once
#include <cstddef>
#include <cstdint>
@@ -27,6 +26,7 @@
#include "kudu/client/client.h"
#include "kudu/client/error_collector.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/common/txn_id.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/condition_variable.h"
@@ -63,7 +63,8 @@ class KuduWriteOperation;
class KuduSession::Data {
public:
explicit Data(sp::shared_ptr<KuduClient> client,
- std::weak_ptr<rpc::Messenger> messenger);
+ std::weak_ptr<rpc::Messenger> messenger,
+ const kudu::TxnId& txn_id = kudu::TxnId::kInvalidTxnId);
void Init(sp::weak_ptr<KuduSession> session);
@@ -241,9 +242,14 @@ class KuduSession::Data {
// The total number of bytes used by buffered write operations.
int64_t buffer_bytes_used_; // protected by mutex_
+ // Transaction ID for this session's transaction (if any): txn_id_.IsValid()
+ // returns true only if the upper-level session is a transactional one.
+ const TxnId txn_id_;
+
private:
FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
+ FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
bool buffer_pre_flush_enabled_; // Set to 'false' only in test scenarios.
@@ -252,5 +258,3 @@ class KuduSession::Data {
} // namespace client
} // namespace kudu
-
-#endif
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 94a50c6..9dd51c0 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -143,6 +143,9 @@ message WriteRequestPB {
// An authorization token with which to authorize this request.
optional security.SignedTokenPB authz_token = 6;
+
+ // The transaction ID associated with this write request, if any.
+ optional int64 txn_id = 7;
}
message WriteResponsePB {