This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6f5f100 [client] updated multi-row transaction API
6f5f100 is described below
commit 6f5f100c8443353c007ecf375b76ba940c687f98
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jan 8 22:42:54 2021 -0800
[client] updated multi-row transaction API
This patch removes KuduTransactionSerializer and switches to using
KuduTransaction::SerializationOptions instead.
Change-Id: Id646c41d60f385f079ebfaec2daf7c2a108306b2
Reviewed-on: http://gerrit.cloudera.org:8080/16948
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
Reviewed-by: Hao Hao <[email protected]>
Reviewed-by: Grant Henke <[email protected]>
---
src/kudu/client/client-test.cc | 29 ++---
src/kudu/client/client.cc | 43 ++++---
src/kudu/client/client.h | 208 +++++++++++++++++---------------
src/kudu/client/transaction-internal.cc | 22 +---
src/kudu/client/transaction-internal.h | 31 +++--
5 files changed, 170 insertions(+), 163 deletions(-)
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 60d9f1e..4d7e0ca 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -439,7 +439,7 @@ class ClientTest : public KuduTest {
// once the transaction orchestration is implemented
Status FinalizeCommitTransaction(const shared_ptr<KuduTransaction>& txn) {
string txn_token;
- RETURN_NOT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+ RETURN_NOT_OK(txn->Serialize(&txn_token));
TxnTokenPB token;
CHECK(token.ParseFromString(txn_token));
CHECK(token.has_txn_id());
@@ -7221,7 +7221,7 @@ TEST_F(ClientTest, TxnCommit) {
ASSERT_FALSE(is_complete);
ASSERT_TRUE(cs.IsIncomplete()) << cs.ToString();
ASSERT_STR_CONTAINS(cs.ToString(), "commit is still in progress");
- ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+ ASSERT_OK(txn->Serialize(&txn_token));
}
// Make sure the transaction isn't aborted once its KuduTransaction handle
@@ -7345,12 +7345,12 @@ TEST_F(ClientTest, TxnToken) {
ASSERT_GT(txn_keepalive_ms, 0);
string txn_token;
- ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+ ASSERT_OK(txn->Serialize(&txn_token));
// Serializing the same transaction again produces the same result.
{
string token;
- ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&token));
+ ASSERT_OK(txn->Serialize(&token));
ASSERT_EQ(txn_token, token);
}
@@ -7373,7 +7373,7 @@ TEST_F(ClientTest, TxnToken) {
// Make sure the KuduTransaction object deserialized from a token is fully
// functional.
string serdes_txn_token;
-
ASSERT_OK(KuduTransactionSerializer(serdes_txn).Serialize(&serdes_txn_token));
+ ASSERT_OK(serdes_txn->Serialize(&serdes_txn_token));
ASSERT_EQ(txn_token, serdes_txn_token);
// TODO(awong): remove once we register participants automatically before
@@ -7389,7 +7389,7 @@ TEST_F(ClientTest, TxnToken) {
// The state of a transaction isn't stored in the token, so initiating
// commit of the transaction doesn't change the result of the
serialization.
string token;
- ASSERT_OK(KuduTransactionSerializer(serdes_txn).Serialize(&token));
+ ASSERT_OK(serdes_txn->Serialize(&token));
ASSERT_EQ(serdes_txn_token, token);
}
@@ -7397,14 +7397,14 @@ TEST_F(ClientTest, TxnToken) {
shared_ptr<KuduTransaction> other_txn;
ASSERT_OK(client_->NewTransaction(&other_txn));
string other_txn_token;
- ASSERT_OK(KuduTransactionSerializer(other_txn).Serialize(&other_txn_token));
+ ASSERT_OK(other_txn->Serialize(&other_txn_token));
ASSERT_NE(txn_token, other_txn_token);
// The state of a transaction isn't stored in the token, so aborting
// the doesn't change the result of the serialization.
string token;
ASSERT_OK(other_txn->Rollback());
- ASSERT_OK(KuduTransactionSerializer(other_txn).Serialize(&token));
+ ASSERT_OK(other_txn->Serialize(&token));
ASSERT_EQ(other_txn_token, token);
}
@@ -7413,11 +7413,12 @@ TEST_F(ClientTest, TxnToken) {
// Status::NotAuthorized() status.
TEST_F(ClientTest, AttemptToControlTxnByOtherUser) {
static constexpr const char* const kOtherTxnUser = "other-txn-user";
+ const KuduTransaction::SerializationOptions kSerOptions;
shared_ptr<KuduTransaction> txn;
ASSERT_OK(client_->NewTransaction(&txn));
string txn_token;
- ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+ ASSERT_OK(txn->Serialize(&txn_token));
// Transaction identifier is surfacing here only to build the reference error
// message for Status::NotAuthorized() returned by attempts to perform
@@ -7533,7 +7534,7 @@ TEST_F(ClientTest, TxnKeepAlive) {
{
shared_ptr<KuduTransaction> txn;
ASSERT_OK(client_->NewTransaction(&txn));
- ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+ ASSERT_OK(txn->Serialize(&txn_token));
}
SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
@@ -7557,7 +7558,7 @@ TEST_F(ClientTest, TxnKeepAlive) {
{
shared_ptr<KuduTransaction> txn;
ASSERT_OK(client_->NewTransaction(&txn));
- ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+ ASSERT_OK(txn->Serialize(&txn_token));
}
shared_ptr<KuduTransaction> serdes_txn;
@@ -7583,9 +7584,9 @@ TEST_F(ClientTest, TxnKeepAlive) {
{
shared_ptr<KuduTransaction> txn;
ASSERT_OK(client_->NewTransaction(&txn));
- ASSERT_OK(KuduTransactionSerializer(txn)
- .enable_keepalive(true)
- .Serialize(&txn_token));
+ KuduTransaction::SerializationOptions options;
+ options.enable_keepalive(true);
+ ASSERT_OK(txn->Serialize(&txn_token, options));
}
shared_ptr<KuduTransaction> serdes_txn;
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index ed25587..18fbced 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -399,6 +399,24 @@ Status KuduClientBuilder::Build(shared_ptr<KuduClient>*
client) {
return Status::OK();
}
+KuduTransaction::SerializationOptions::SerializationOptions()
+ : data_(new Data) {
+}
+
+KuduTransaction::SerializationOptions::~SerializationOptions() {
+ delete data_;
+}
+
+bool KuduTransaction::SerializationOptions::keepalive() const {
+ return data_->enable_keepalive_;
+}
+
+KuduTransaction::SerializationOptions&
+KuduTransaction::SerializationOptions::enable_keepalive(bool enable) {
+ data_->enable_keepalive_ = enable;
+ return *this;
+}
+
KuduTransaction::KuduTransaction(const sp::shared_ptr<KuduClient>& client)
: data_(new KuduTransaction::Data(client)) {
}
@@ -424,31 +442,18 @@ Status KuduTransaction::Rollback() {
return data_->Rollback();
}
+Status KuduTransaction::Serialize(
+ string* serialized_txn,
+ const SerializationOptions& options) const {
+ return data_->Serialize(serialized_txn, options);
+}
+
Status KuduTransaction::Deserialize(const sp::shared_ptr<KuduClient>& client,
const string& serialized_txn,
sp::shared_ptr<KuduTransaction>* txn) {
return Data::Deserialize(client, serialized_txn, txn);
}
-KuduTransactionSerializer::KuduTransactionSerializer(
- const sp::shared_ptr<KuduTransaction>& txn)
- : data_(new KuduTransactionSerializer::Data(txn)) {
-}
-
-KuduTransactionSerializer::~KuduTransactionSerializer() {
- delete data_;
-}
-
-KuduTransactionSerializer&
-KuduTransactionSerializer::enable_keepalive(bool enable) {
- data_->enable_keepalive(enable);
- return *this;
-}
-
-Status KuduTransactionSerializer::Serialize(string* serialized_txn) const {
- return data_->Serialize(serialized_txn);
-}
-
KuduClient::KuduClient()
: data_(new KuduClient::Data()) {
static ObjectIdGenerator oid_generator;
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 504c5c3..04f1925 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -415,13 +415,120 @@ class KUDU_EXPORT KuduTransaction :
/// @return Operation result status.
Status Rollback() WARN_UNUSED_RESULT;
+ /// This class controls serialization-related parameters for a Kudu
+ /// transaction handle (i.e. @c KuduTransaction).
+ ///
+ /// One of the parameters is whether to enable sending keepalive messages for
+ /// the resulting @c KuduTransaction handle upon deserialization.
+ /// In future, the list of configurable parameters might be extended (e.g.,
+ /// by adding commit and abort permissions, i.e. whether a handle obtained by
+ /// deserializing a handle from the string representation can be used
+ /// to commit and/or abort the transaction).
+ class KUDU_EXPORT SerializationOptions {
+ public:
+ SerializationOptions();
+ ~SerializationOptions();
+
+ /// This method returns the current setting keepalive behavior, i.e.
whether
+ /// to send keepalive messages for Kudu transaction handles.
+ ///
+ /// No keepalive heartbeat messages are sent from a transaction handle if
+ /// its token was created with the default "keepalive disabled" setting.
+ /// The idea here is that the most common use case for using transaction
+ /// tokens is of the "start topology" (see below), so it's enough to have
+ /// just one top-level handle sending keepalive messages. Overall, having
+ /// more than one actor sending keepalive messages for a transaction is
+ /// acceptable but it puts needless load on a cluster.
+ ///
+ /// The most common use case for a transaction's handle
+ /// serialization/deserialization is of the "star topology": a transaction
+ /// is started by a top-level application which sends the transaction token
+ /// produced by serializing the original transaction handle to other worker
+ /// applications running concurrently, where the latter write their data
+ /// in the context of the same transaction and report back to the top-level
+ /// application, which in its turn initiates committing the transaction
+ /// as needed. The important point is that the top-level application keeps
+ /// the transaction handle around all the time from the start of the
+ /// transaction to the very point when transaction is committed. Under the
+ /// hood, the original transaction handle sends keepalive messages as
+ /// required until commit phase is initiated, so the deserialized
+ /// transaction handles which are used by the worker applications don't
+ /// need to send keepalive messages.
+ ///
+ /// The other (less common) use case is of the "ring topology": a chain of
+ /// applications work sequentially as a part of the same transaction, where
+ /// the very first application starts the transaction, writes its data, and
+ /// hands over the responsibility of managing the lifecycle of the
+ /// transaction to other application down the chain. After doing so it may
+ /// exit, so now only the next application has the active transaction
+ /// handle, and so on it goes until the transaction is committed by the
+ /// application in the end of the chain. In this scenario, every
+ /// deserialized handle have to send keepalive messages to avoid automatic
+ /// rollback of the transaction, and every application in the chain should
+ /// set @c SerializationOptions::enable_keepalive to true when serializing
+ /// its transaction handle into a transaction token to pass to the
+ /// application next in the chain.
+ ///
+ /// @return whether to send keepalive messages for Kudu transaction handles
+ bool keepalive() const;
+
+ /// Enable/disable keepalive for a handle which is the result of
+ /// deserializing a previously serialized @c KuduTransaction handle.
+ ///
+ /// Sending keepalive messages for a transaction handle deserialized
+ /// from a string is disabled by default.
+ ///
+ /// @param [in] enable
+ /// Whether to enable sending keepalive messages for @c KuduTransaction
+ /// handle once it's deserialized from the string representation of a
+ /// Kudu transaction handle.
+ /// @return Reference to the updated object.
+ SerializationOptions& enable_keepalive(bool enable);
+
+ private:
+ friend class KuduTransaction;
+ class KUDU_NO_EXPORT Data;
+ Data* data_; // Owned.
+
+ DISALLOW_COPY_AND_ASSIGN(SerializationOptions);
+ };
+
+ /// Export the information on this transaction in a serialized form.
+ ///
+ /// The serialized information on a Kudu transaction can be passed among
+ /// different Kudu clients running at multiple nodes, so those separate
+ /// Kudu clients can perform operations to be a part of the same distributed
+ /// transaction. The resulting string is referred as "transaction token" and
+ /// can be deserialized into a transaction handle (i.e. an object of the @c
+ /// KuduTransaction class) via the @c KuduTransaction::Deserialize() method.
+ ///
+ /// This method doesn't perform any RPC under the hood. The behavior of this
+ /// method is controlled by @c SerializationOptions set for this transaction
+ /// handle.
+ ///
+ /// @note The representation of the data in the serialized form
+ /// (i.e. the format of a Kudu transaction token) is an implementation
+ /// detail, not a part of the public API.
+ ///
+ /// @param [out] serialized_txn
+ /// Result string to output the serialized transaction information.
+ /// @param [in] options
+ /// Options to use when serializing the handle (optional). If omitted,
+ /// the default serialization parameters are used -- the same as it would
+ /// be for a default-constructed instance of SerializationOptions used
+ /// for this parameter.
+ /// @return Operation result status.
+ Status Serialize(
+ std::string* serialized_txn,
+ const SerializationOptions& options = SerializationOptions()) const
WARN_UNUSED_RESULT;
+
/// Re-create KuduTransaction object given its serialized representation.
///
/// This method doesn't perform any RPC under the hood. The newly created
/// object automatically does or does not send keep-alive messages depending
- /// on the @c KuduTransactionSerializer::enable_keepalive() setting when
- /// the original @c KuduTransaction object was serialized using
- /// @c KuduTransactionSerializer::Serialize().
+ /// on the @c KuduTransaction::SerializationOptions::enable_keepalive()
+ /// setting when the original @c KuduTransaction object was serialized using
+ /// @c KuduTransaction::Serialize().
///
/// @param [in] client
/// Client instance to bound the result object to.
@@ -434,9 +541,10 @@ class KUDU_EXPORT KuduTransaction :
const std::string& serialized_txn,
sp::shared_ptr<KuduTransaction>* txn)
WARN_UNUSED_RESULT;
private:
+ DISALLOW_COPY_AND_ASSIGN(KuduTransaction);
+
friend class KuduClient;
friend class KuduSession;
- friend class KuduTransactionSerializer;
FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
FRIEND_TEST(ClientTest, TxnToken);
@@ -446,96 +554,6 @@ class KUDU_EXPORT KuduTransaction :
Data* data_; // Owned.
};
-/// A utility class to help with serialization of @c KuduTransaction handles.
-///
-/// The purpose of this class is to control the keepalive behavior for the
-/// @c KuduTransaction handle once it's deserialized from the token. In future,
-/// the list of configurable parameters might be extended (e.g., by adding
-/// commit and abort permissions, i.e. whether a handle can be used to
-/// commit and/or abort the transaction).
-class KUDU_EXPORT KuduTransactionSerializer {
- public:
- /// Create a serializer for the specified @c KuduTransaction object.
- ///
- /// @param [out] txn
- /// Smart pointer to the @c KuduTransaction object to be serialized
- /// by the @c Serialize() method.
- explicit KuduTransactionSerializer(const sp::shared_ptr<KuduTransaction>&
txn);
-
- ~KuduTransactionSerializer();
-
- /// Whether to enable sending keepalive messages for the @c KuduTransaction
- /// handle when it's deserialized from the string produced by
- /// @c KuduTransactionSerializer::Serialize().
- ///
- /// No keepalive heartbeat messages are sent from a transaction handle whose
- /// source token was created with the default "keepalive disabled" setting.
- /// The idea here is that the most common use case for using transaction
- /// tokens is of the "start topology" (see below), so it's enough to have
- /// just one top-level handle sending keepalive messages. Overall, having
more
- /// than one actor sending keepalive messages for a transaction is acceptable
- /// but it puts needless load on a cluster.
- ///
- /// The most common use case for a transaction's handle
- /// serialization/deserialization is of the "star topology": a transaction is
- /// started by a top-level application which sends the transaction token
- /// produced by serializing the original transaction handle to other worker
- /// applications running concurrently, where the latter write their data
- /// in the context of the same transaction and report back to the top-level
- /// application, which in its turn initiates committing the transaction
- /// as needed. The important point is that the top-level application keeps
the
- /// transaction handle around all the time from the start of the transaction
- /// to the very point when transaction is committed. Under the hood, the
- /// original transaction handle sends keepalive messages as required until
- /// commit phase is initiated, so the deserialized transaction handles which
- /// are used by the worker applications don't need to send keepalive
messages.
- ///
- /// The other (less common) use case is of the "ring topology": a chain of
- /// applications work sequentially as a part of the same transaction, where
- /// the very first application starts the transaction, writes its data, and
- /// hands over the responsibility of managing the lifecycle of the
transaction
- /// to other application down the chain. After doing so it may exit, so now
- /// only the next application has the active transaction handle, and so on it
- /// goes until the transaction is committed by the application in the end
- /// of the chain. In this scenario, every deserialized handle have to send
- /// keepalive messages to avoid automatic rollback of the transaction,
- /// and every application in the chain should call
- /// @c KuduTransactionSerializer::enable_keepalive() when serializing its
- /// transaction handle into a transaction token to pass to the application
- /// next in the chain.
- ///
- /// @param [in] enable
- /// Whether to enable sending keepalive messages for the @c KuduTransaction
- /// object once it's deserialized from the string to be produced by
- /// @c KuduTransactionSerializer::Serialize().
- /// @return Reference to the updated object.
- KuduTransactionSerializer& enable_keepalive(bool enable);
-
- /// Export the information on this transaction in a serialized form.
- ///
- /// The serialized information on a Kudu transaction can be passed among
- /// different Kudu clients running at multiple nodes, so those separate
- /// Kudu clients can perform operations to be a part of the same distributed
- /// transaction. The resulting string is referred as "transaction token" and
- /// it can be deserialized into an object of the @c KuduTransaction class via
- /// the @c KuduTransaction::Deserialize() method.
- ///
- /// This method doesn't perform any RPC under the hood.
- ///
- /// @note The representation of the data in the serialized form
- /// (i.e. the format of a Kudu transaction token) is an implementation
- /// detail, not a part of the public API.
- ///
- /// @param [out] serialized_txn
- /// Result string to output the serialized transaction information.
- /// @return Operation result status.
- Status Serialize(std::string* serialized_txn) const WARN_UNUSED_RESULT;
-
- private:
- class KUDU_NO_EXPORT Data;
- Data* data_; // Owned.
-};
-
/// @brief A handle for a connection to a cluster.
///
/// The KuduClient class represents a connection to a cluster. From the user
@@ -710,7 +728,7 @@ class KUDU_EXPORT KuduClient : public
sp::enable_shared_from_this<KuduClient> {
/// This 'out' parameter is populated iff the operation to begin
/// a transaction was successful.
/// @return The status of underlying "begin transaction" operation.
- Status NewTransaction(sp::shared_ptr<KuduTransaction>* txn);
+ Status NewTransaction(sp::shared_ptr<KuduTransaction>* txn)
WARN_UNUSED_RESULT;
/// @cond PRIVATE_API
diff --git a/src/kudu/client/transaction-internal.cc
b/src/kudu/client/transaction-internal.cc
index 85c732b..7d88734 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -214,14 +214,15 @@ Status KuduTransaction::Data::Rollback() {
return Status::OK();
}
-Status KuduTransaction::Data::Serialize(string* serialized_txn,
- bool enable_keepalive) const {
+Status KuduTransaction::Data::Serialize(
+ string* serialized_txn,
+ const SerializationOptions& options) const {
DCHECK(serialized_txn);
DCHECK(txn_id_.IsValid());
TxnTokenPB token;
token.set_txn_id(txn_id_);
token.set_keepalive_millis(txn_keep_alive_ms_);
- token.set_enable_keepalive(enable_keepalive);
+ token.set_enable_keepalive(options.keepalive());
if (!token.SerializeToString(serialized_txn)) {
return Status::Corruption("unable to serialize transaction information");
}
@@ -453,20 +454,5 @@ void KuduTransaction::Data::TxnKeepAliveCb(
}
}
-KuduTransactionSerializer::Data::Data(
- const sp::shared_ptr<KuduTransaction>& txn)
- : txn_(txn),
- enable_keepalive_(false) {
- DCHECK(txn_);
-}
-
-Status KuduTransactionSerializer::Data::Serialize(string* txn_token) const {
- DCHECK(txn_token);
- if (PREDICT_FALSE(!txn_)) {
- return Status::InvalidArgument("no transaction to serialize");
- }
- return txn_->data_->Serialize(txn_token, enable_keepalive_);
-}
-
} // namespace client
} // namespace kudu
diff --git a/src/kudu/client/transaction-internal.h
b/src/kudu/client/transaction-internal.h
index ce0db97..19bb414 100644
--- a/src/kudu/client/transaction-internal.h
+++ b/src/kudu/client/transaction-internal.h
@@ -34,6 +34,18 @@ namespace client {
struct KeepaliveRpcCtx;
+// This class hides the implementation details of the publicly exposed
+// kudu::client::KuduTransaction::SerializationOptions class.
+class KuduTransaction::SerializationOptions::Data {
+ public:
+ Data()
+ : enable_keepalive_(false) {
+ }
+ ~Data() = default;
+
+ bool enable_keepalive_;
+};
+
// This class implements the functionality of kudu::client::KuduTransaction.
class KuduTransaction::Data {
public:
@@ -46,7 +58,8 @@ class KuduTransaction::Data {
Status IsCommitComplete(bool* is_complete, Status* completion_status);
Status Rollback();
- Status Serialize(std::string* serialized_txn, bool enable_keepalive) const;
+ Status Serialize(std::string* serialized_txn,
+ const SerializationOptions& options) const;
static Status Deserialize(const sp::shared_ptr<KuduClient>& client,
const std::string& serialized_txn,
sp::shared_ptr<KuduTransaction>* txn);
@@ -85,21 +98,5 @@ class KuduTransaction::Data {
DISALLOW_COPY_AND_ASSIGN(Data);
};
-// This class implements the functionality of
-// kudu::client::KuduTransactionSerializer.
-class KuduTransactionSerializer::Data {
- public:
- explicit Data(const sp::shared_ptr<KuduTransaction>& client);
-
- void enable_keepalive(bool enable) {
- enable_keepalive_ = enable;
- }
-
- Status Serialize(std::string* txn_token) const;
-
- sp::shared_ptr<KuduTransaction> txn_;
- bool enable_keepalive_;
-};
-
} // namespace client
} // namespace kudu