This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new e64eb7c  KUDU-2612: C++ client sets txn_id in WriteRequestPB
e64eb7c is described below

commit e64eb7c7ceceec76aeb5cceac9dc42cc0e78f1ec
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Oct 20 16:18:10 2020 -0700

    KUDU-2612: C++ client sets txn_id in WriteRequestPB
    
    This patch introduces an optional 'txn_id' field into WriteRequestPB
    protobuf message and changes Kudu C++ client to populate it for
    all requests sent out from a transactional KuduSession.  In addition,
    this patch contains unit-level test to verify that KuduSession and
    Batcher have their txn_id_ fields set correspondingly.  It's assumed
    that follow-up changelists will add more comprehensive end-to-end
    coverage once transactional API for Kudu client is introduced and
    tablet servers process the WriteRequestPB:txn_id field as prescribed
    by the design document [1].
    
    A follow-up changelist will introduce corresponding client API changes
    for transaction-related operations, and with those it will be possible
    to begin, commit, and rollback a transaction.  However, I think it's
    important to highlight a few assumptions that Andrew and I discussed
    offline: this patch assumes that a single KuduSession isn't allowed
    to have a mix of transactional and non-transactional write operations.
    Also, all write operations handled by a KuduSession instance can be
    attributed only to a single (the same) transaction.  In other words,
    it's assumed that a separate KuduSession instances should be created
    to handle operations pertaining to different transactions.  This
    restriction doesn't seem to be too harsh, but it helps to avoid a
    complicated dance in handling already accumulated write operation in
    KuduClient.  Otherwise, it would be necessary to flush buffered
    operations if switching from non-transactional writes to transactional
    ones and back.  If it turns out that the functionality of mixing
    transactional and non-transactional write operations is necessary,
    this restriction can be removed: it's feasible to add transaction
    control operations into KuduSession in future (i.e.
    {Begin,Commit,Abort}Transaction() methods), but it will entail adding
    more complexity into already convoluted client-side code of handling
    buffered write operations.
    
    For now, I decided not to expose txn_id via the public client API.
    Also, I specifically avoided exposing txn_id via KuduWriteOperation as
    well, keeping it a private member of internal classes KuduSession::Data
    and Batcher.
    
    [1] https://s.apache.org/kudu-multi-row-transaction-design
    
    Change-Id: Ib60cb0ea8066e2c6417ebe4b2a24aff3512b44f1
    Reviewed-on: http://gerrit.cloudera.org:8080/16625
    Reviewed-by: Andrew Wong <[email protected]>
    Tested-by: Kudu Jenkins
    Reviewed-by: Hao Hao <[email protected]>
---
 src/kudu/client/batcher.cc          | 10 +++++++-
 src/kudu/client/batcher.h           | 19 ++++++++++++--
 src/kudu/client/client-test.cc      | 50 +++++++++++++++++++++++++++++++++++++
 src/kudu/client/client.cc           |  7 +++++-
 src/kudu/client/client.h            |  3 +++
 src/kudu/client/session-internal.cc | 22 ++++++++--------
 src/kudu/client/session-internal.h  | 14 +++++++----
 src/kudu/tserver/tserver.proto      |  3 +++
 8 files changed, 107 insertions(+), 21 deletions(-)

diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index abe0dd6..6fbb78d 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -42,6 +42,7 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/atomic_refcount.h"
@@ -80,6 +81,7 @@ using kudu::security::SignedTokenPB;
 using kudu::tserver::WriteRequestPB;
 using kudu::tserver::WriteResponsePB;
 using kudu::tserver::WriteResponsePB_PerRowErrorPB;
+using kudu::TxnId;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -296,6 +298,10 @@ WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
     req_.set_propagated_timestamp(propagated_timestamp);
   }
 
+  if (batcher->txn_id().IsValid()) {
+    req_.set_txn_id(batcher->txn_id());
+  }
+
   // Set up schema
   CHECK_OK(SchemaToPB(*schema, req_.mutable_schema(),
                       SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES |
@@ -580,11 +586,13 @@ void WriteRpc::GotNewAuthzTokenRetryCb(const Status& 
status) {
 Batcher::Batcher(KuduClient* client,
                  scoped_refptr<ErrorCollector> error_collector,
                  sp::weak_ptr<KuduSession> session,
-                 kudu::client::KuduSession::ExternalConsistencyMode 
consistency_mode)
+                 kudu::client::KuduSession::ExternalConsistencyMode 
consistency_mode,
+                 const TxnId& txn_id)
   : state_(kGatheringOps),
     client_(client),
     weak_session_(std::move(session)),
     consistency_mode_(consistency_mode),
+    txn_id_(txn_id),
     error_collector_(std::move(error_collector)),
     had_errors_(false),
     flush_callback_(nullptr),
diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h
index f11148e..3874705 100644
--- a/src/kudu/client/batcher.h
+++ b/src/kudu/client/batcher.h
@@ -27,6 +27,7 @@
 #include "kudu/client/client.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/write_op.h"
+#include "kudu/common/txn_id.h"
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
@@ -70,7 +71,8 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   Batcher(KuduClient* client,
           scoped_refptr<ErrorCollector> error_collector,
           client::sp::weak_ptr<KuduSession> session,
-          kudu::client::KuduSession::ExternalConsistencyMode consistency_mode);
+          kudu::client::KuduSession::ExternalConsistencyMode consistency_mode,
+          const kudu::TxnId& txn_id);
 
   // Abort the current batch. Any writes that were buffered and not yet sent 
are
   // discarded. Those that were sent may still be delivered.  If there is a 
pending Flush
@@ -125,6 +127,12 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
     return buffer_bytes_used_.Load();
   }
 
+  // Return the identifier of a multi-row transaction (if any) that all the
+  // accumulated write operations are part of.
+  const TxnId& txn_id() const {
+    return txn_id_;
+  }
+
   // Compute in-buffer size for the given write operation.
   static int64_t GetOperationSizeInBuffer(KuduWriteOperation* write_op) {
     return write_op->SizeInBuffer();
@@ -185,7 +193,14 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   client::sp::weak_ptr<KuduSession> weak_session_;
 
   // The consistency mode set in the session.
-  kudu::client::KuduSession::ExternalConsistencyMode consistency_mode_;
+  const kudu::client::KuduSession::ExternalConsistencyMode consistency_mode_;
+
+  // The identifier of a transaction that is associated with operations
+  // processed by this batcher. All operations accumulated by one batcher 
either
+  // related to the same multi-row transaction or all of the accumulated
+  // operations are not a part of any multi-row transaction. In the latter 
case,
+  // txn_id_.IsValid() would return 'false'.
+  const TxnId txn_id_;
 
   // Errors are reported into this error collector.
   scoped_refptr<ErrorCollector> error_collector_;
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index ca89a7a..8f95acc 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -45,6 +45,7 @@
 #include <google/protobuf/util/message_differencer.h>
 #include <gtest/gtest.h>
 
+#include "kudu/client/batcher.h"
 #include "kudu/client/callbacks.h"
 #include "kudu/client/client-internal.h"
 #include "kudu/client/client-test-util.h"
@@ -70,6 +71,7 @@
 #include "kudu/common/row.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
+#include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/casts.h"
@@ -6440,6 +6442,54 @@ TEST_F(ClientTest, TestRetrieveAuthzTokenInParallel) {
   ASSERT_LT(num_reqs, kThreads);
 }
 
+// This is test verifies that txn_id is properly set in a transactional session
+// and its current batcher. This is a unit-level test scenario.
+TEST_F(ClientTest, TxnIdOfTransactionalSession) {
+  const auto apply_single_insert = [this] (KuduSession* s) {
+    ASSERT_OK(s->SetFlushMode(KuduSession::MANUAL_FLUSH));
+    unique_ptr<KuduInsert> insert(client_table_->NewInsert());
+    ASSERT_OK(insert->mutable_row()->SetInt32("key", 0));
+    ASSERT_OK(insert->mutable_row()->SetInt32("int_val", 0));
+    ASSERT_OK(s->Apply(insert.release()));
+  };
+
+  // Check how relevant member fields are populated in case of
+  // non-transactional session.
+  {
+    KuduSession s(client_);
+
+    const auto& session_data_txn_id = s.data_->txn_id_;
+    ASSERT_FALSE(session_data_txn_id.IsValid());
+
+    NO_FATALS(apply_single_insert(&s));
+
+    // Make sure current batcher has txn_id_ set to an non-valid transaction
+    // identifier.
+    ASSERT_NE(nullptr, s.data_->batcher_.get());
+    const auto& batcher_txn_id = s.data_->batcher_->txn_id();
+    ASSERT_FALSE(batcher_txn_id.IsValid());
+  }
+
+  // Check how relevant member fields are populated in case of
+  // transactional session.
+  {
+    const TxnId kTxnId(0);
+    KuduSession s(client_, kTxnId);
+
+    const auto& session_data_txn_id = s.data_->txn_id_;
+    ASSERT_TRUE(session_data_txn_id.IsValid());
+    ASSERT_EQ(kTxnId.value(), session_data_txn_id.value());
+
+    NO_FATALS(apply_single_insert(&s));
+
+    // Make sure current batcher has txn_id_ member properly set.
+    ASSERT_NE(nullptr, s.data_->batcher_.get());
+    const auto& batcher_txn_id = s.data_->batcher_->txn_id();
+    ASSERT_TRUE(batcher_txn_id.IsValid());
+    ASSERT_EQ(kTxnId.value(), batcher_txn_id.value());
+  }
+}
+
 // This test verifies that rows with column schema violations such as
 // unset non-nullable columns (with no default value) are detected at the 
client
 // side while calling Apply() for corresponding write operations. So, that sort
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 16819ac..6c6bce4 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1154,7 +1154,12 @@ KuduError::~KuduError() {
 ////////////////////////////////////////////////////////////
 
 KuduSession::KuduSession(const shared_ptr<KuduClient>& client)
-  : data_(new KuduSession::Data(client, client->data_->messenger_)) {
+    : data_(new KuduSession::Data(client, client->data_->messenger_)) {
+}
+
+KuduSession::KuduSession(const shared_ptr<KuduClient>& client,
+                         const TxnId& txn_id)
+    : data_(new KuduSession::Data(client, client->data_->messenger_, txn_id)) {
 }
 
 KuduSession::~KuduSession() {
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 92addfd..aa2a207 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -60,6 +60,7 @@ class MonoDelta;
 class Partition;
 class PartitionSchema;
 class SecurityUnknownTskTest;
+class TxnId;
 
 namespace client {
 class KuduClient;
@@ -2039,8 +2040,10 @@ class KUDU_EXPORT KuduSession : public 
sp::enable_shared_from_this<KuduSession>
   friend class ClientTest;
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
+  FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
 
   explicit KuduSession(const sp::shared_ptr<KuduClient>& client);
+  KuduSession(const sp::shared_ptr<KuduClient>& client, const TxnId& txn_id);
 
   // Owned.
   Data* data_;
diff --git a/src/kudu/client/session-internal.cc 
b/src/kudu/client/session-internal.cc
index cc967e5..5baa4ed 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -35,24 +35,21 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/util/logging.h"
 
+
+using kudu::client::internal::Batcher;
+using kudu::client::internal::ErrorCollector;
+using kudu::client::sp::shared_ptr;
+using kudu::client::sp::weak_ptr;
+using kudu::rpc::Messenger;
 using std::unique_ptr;
 using strings::Substitute;
 
 namespace kudu {
-
-using rpc::Messenger;
-
 namespace client {
 
-using internal::Batcher;
-using internal::ErrorCollector;
-
-using sp::shared_ptr;
-using sp::weak_ptr;
-
-
 KuduSession::Data::Data(shared_ptr<KuduClient> client,
-                        std::weak_ptr<rpc::Messenger> messenger)
+                        std::weak_ptr<rpc::Messenger> messenger,
+                        const TxnId& txn_id)
     : client_(std::move(client)),
       messenger_(std::move(messenger)),
       error_collector_(new ErrorCollector()),
@@ -66,6 +63,7 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client,
       buffer_bytes_limit_(7 * 1024 * 1024),
       buffer_watermark_pct_(50),
       buffer_bytes_used_(0),
+      txn_id_(txn_id),
       buffer_pre_flush_enabled_(true) {
 }
 
@@ -475,7 +473,7 @@ Status KuduSession::Data::ApplyWriteOp(KuduWriteOperation* 
write_op) {
       // no thread-safety is advertised for the kudu::KuduSession interface.
       scoped_refptr<Batcher> batcher(
           new Batcher(client_.get(), error_collector_, session_,
-                      external_consistency_mode_));
+                      external_consistency_mode_, txn_id_));
       if (timeout_.Initialized()) {
         batcher->SetTimeout(timeout_);
       }
diff --git a/src/kudu/client/session-internal.h 
b/src/kudu/client/session-internal.h
index e529f7d..db9de7b 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_CLIENT_SESSION_INTERNAL_H
-#define KUDU_CLIENT_SESSION_INTERNAL_H
+#pragma once
 
 #include <cstddef>
 #include <cstdint>
@@ -27,6 +26,7 @@
 #include "kudu/client/client.h"
 #include "kudu/client/error_collector.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/common/txn_id.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/condition_variable.h"
@@ -63,7 +63,8 @@ class KuduWriteOperation;
 class KuduSession::Data {
  public:
   explicit Data(sp::shared_ptr<KuduClient> client,
-                std::weak_ptr<rpc::Messenger> messenger);
+                std::weak_ptr<rpc::Messenger> messenger,
+                const kudu::TxnId& txn_id = kudu::TxnId::kInvalidTxnId);
 
   void Init(sp::weak_ptr<KuduSession> session);
 
@@ -241,9 +242,14 @@ class KuduSession::Data {
   // The total number of bytes used by buffered write operations.
   int64_t buffer_bytes_used_;  // protected by mutex_
 
+  // Transaction ID for this session's transaction (if any): txn_id_.IsValid()
+  // returns true only if the upper-level session is a transactional one.
+  const TxnId txn_id_;
+
  private:
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
+  FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
 
   bool buffer_pre_flush_enabled_; // Set to 'false' only in test scenarios.
 
@@ -252,5 +258,3 @@ class KuduSession::Data {
 
 } // namespace client
 } // namespace kudu
-
-#endif
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 94a50c6..9dd51c0 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -143,6 +143,9 @@ message WriteRequestPB {
 
   // An authorization token with which to authorize this request.
   optional security.SignedTokenPB authz_token = 6;
+
+  // The transaction ID associated with this write request, if any.
+  optional int64 txn_id = 7;
 }
 
 message WriteResponsePB {

Reply via email to