This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 39dba1f  KUDU-2612 proper handling of transient errors from TxnManager
39dba1f is described below

commit 39dba1fd2272b7508043b4fa52066d6123132f4e
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Nov 24 21:11:17 2020 -0800

    KUDU-2612 proper handling of transient errors from TxnManager
    
    This patch fixes the handling of ServiceUnavailable from TxnManager
    in the Kudu C++ client, addressing one of TODOs from [1].
    
    This patch also contains a new test scenario to cover the updated
    functionality.  I verified that without the changes in handling
    ServiceUnavailable from TxnManager this new test scenario failed.
    
    This is a follow-up to [1].
    
    [1] 
https://github.com/apache/kudu/commit/38aee4b1407b04b8c299768be0ae40d84a8e2a3a
    
    Change-Id: Ia97ddaaf598f2b6d7fcf5fcd42ecb836b82ed547
    Reviewed-on: http://gerrit.cloudera.org:8080/16783
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/client/client-test.cc              | 29 ++++++++++++++++++++++++++++
 src/kudu/client/txn_manager_proxy_rpc.cc    | 30 ++++++++++++++++++++++++++---
 src/kudu/master/txn_manager.proto           |  2 +-
 src/kudu/transactions/txn_status_manager.cc | 12 ++++++++++++
 4 files changed, 69 insertions(+), 4 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 647af55..cb9114e 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -143,6 +143,7 @@ DECLARE_bool(rpc_listen_on_unix_domain_socket);
 DECLARE_bool(rpc_trace_negotiation);
 DECLARE_bool(scanner_inject_service_unavailable_on_continue_scan);
 DECLARE_bool(txn_manager_enabled);
+DECLARE_bool(txn_manager_lazily_initialized);
 DECLARE_int32(flush_threshold_mb);
 DECLARE_int32(flush_threshold_secs);
 DECLARE_int32(heartbeat_interval_ms);
@@ -159,6 +160,7 @@ DECLARE_int32(scanner_inject_latency_on_each_batch_ms);
 DECLARE_int32(scanner_max_batch_size_bytes);
 DECLARE_int32(scanner_ttl_ms);
 DECLARE_int32(table_locations_ttl_ms);
+DECLARE_int32(txn_status_manager_inject_latency_load_from_tablet_ms);
 DECLARE_int64(live_row_count_for_testing);
 DECLARE_int64(on_disk_size_for_testing);
 DECLARE_string(location_mapping_cmd);
@@ -7159,6 +7161,33 @@ TEST_F(ClientTest, NoTxnManager) {
   }
 }
 
+class ClientTxnManagerProxyTest : public ClientTest {
+ public:
+  void SetUp() override {
+    // To avoid extra latency in addition to already injected ones, scenarios
+    // based on setup can assume assume the initial txn status tablet is 
already
+    // created.
+    FLAGS_txn_manager_lazily_initialized = false;
+
+    // Inject latency into the process of loading txn status data from the
+    // backing tablet, so TxnStatusManager would respond with
+    // ServiceUnavailable() for some time right after starting up.
+    FLAGS_txn_status_manager_inject_latency_load_from_tablet_ms = 3000;
+
+    ClientTest::SetUp();
+    ASSERT_OK(cluster_->mini_master()->master()->WaitForTxnManagerInit());
+  }
+};
+
+// This is a scenario to verify the retry logic in the client: if receiving
+// ServiceNotAvailable() error status, it should retry its RPCs to TxnManager
+// a bit later.
+TEST_F(ClientTxnManagerProxyTest, RetryOnServiceUnavailable) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+}
+
 // Client test that assigns locations to clients and tablet servers.
 // For now, assigns a uniform location to all clients and tablet servers.
 class ClientWithLocationTest : public ClientTest {
diff --git a/src/kudu/client/txn_manager_proxy_rpc.cc 
b/src/kudu/client/txn_manager_proxy_rpc.cc
index 7ad38e0..7ab55e8 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.cc
+++ b/src/kudu/client/txn_manager_proxy_rpc.cc
@@ -54,6 +54,7 @@ using kudu::transactions::CommitTransactionResponsePB;
 using kudu::transactions::GetTransactionStateRequestPB;
 using kudu::transactions::GetTransactionStateResponsePB;
 using kudu::transactions::TxnManagerServiceProxy;
+using kudu::transactions::TxnManagerErrorPB;
 using std::string;
 using strings::Substitute;
 
@@ -191,8 +192,8 @@ bool AsyncRandomTxnManagerRpc<ReqClass, 
RespClass>::RetryIfNecessary(
   // Next, parse RPC errors that happened after the connection succeeded.
   // Note: RemoteErrors from the controller are guaranteed to also return error
   // responses, per RpcController's contract (see rpc_controller.h).
-  const ErrorStatusPB* err = retrier().controller().error_response();
   if (s.IsRemoteError()) {
+    const ErrorStatusPB* err = retrier().controller().error_response();
     CHECK(err);
     if (err->has_code() &&
         (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
@@ -206,12 +207,35 @@ bool AsyncRandomTxnManagerRpc<ReqClass, 
RespClass>::RetryIfNecessary(
       }
       return true;
     }
+    // TODO(aserbin): report unsupported features in the error message if it
+    //                starts making sense: of course this code is forward
+    //                looking, but it's not clear how the detailed information
+    //                on missing features could help in making this error
+    //                message more actionable
     if (err->unsupported_feature_flags_size() > 0) {
-      s = Status::NotSupported(Substitute("cluster does not support $0",
-                                          rpc_name_));
+      s = Status::NotSupported("TxnManager is missing required features");
     }
   }
 
+  // Finally, parse generic application errors from TxnManager.
+  if (s.ok() && resp_->has_error()) {
+    const auto& app_err = resp_->error();
+    // As of now, TxnManager doesn't have any custom application error codes.
+    DCHECK_EQ(TxnManagerErrorPB::UNKNOWN_ERROR, app_err.code());
+    const auto app_status = StatusFromPB(app_err.status());
+    DCHECK(!app_status.ok());
+    if (app_status.IsServiceUnavailable()) {
+      if (multi_txn_manager) {
+        ResetTxnManagerAndRetry(app_status);
+      } else {
+        mutable_retrier()->DelayedRetry(this, app_status);
+      }
+      return true;
+    }
+    // All other cases are non-retriable: propagate the app_status and return.
+    s = app_status;
+  }
+
   warn_on_retry.cancel();
   *status = s;
   return false;
diff --git a/src/kudu/master/txn_manager.proto 
b/src/kudu/master/txn_manager.proto
index 03efa52..1915c85 100644
--- a/src/kudu/master/txn_manager.proto
+++ b/src/kudu/master/txn_manager.proto
@@ -37,7 +37,7 @@ message TxnManagerErrorPB {
   }
 
   // The error code.
-  required Code code = 1;
+  required Code code = 1 [ default = UNKNOWN_ERROR ];
 
   // The Status object for the error. This includes a text message that may be
   // more useful to present in log messages, though its error code is less
diff --git a/src/kudu/transactions/txn_status_manager.cc 
b/src/kudu/transactions/txn_status_manager.cc
index 74508a8..66a3c14 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -34,6 +34,7 @@
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/cow_object.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -45,6 +46,14 @@ DEFINE_uint32(transaction_keepalive_interval_ms, 5000,
               "know that a transaction is not abandoned");
 TAG_FLAG(transaction_keepalive_interval_ms, experimental);
 
+DEFINE_int32(txn_status_manager_inject_latency_load_from_tablet_ms, 0,
+             "Injects a random latency between 0 and this many milliseconds "
+             "when loading data from the txn status tablet replica backing "
+             "the instance of TxnStatusManager. This is a test-only flag, "
+             "do not use in production.");
+TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, hidden);
+TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, unsafe);
+
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tserver::TabletServerErrorPB;
@@ -120,6 +129,9 @@ Status TxnStatusManager::LoadFromTablet() {
   TransactionsMap txns_by_id;
   v.Release(&highest_txn_id, &txns_by_id);
 
+  MAYBE_INJECT_RANDOM_LATENCY(
+      FLAGS_txn_status_manager_inject_latency_load_from_tablet_ms);
+
   std::lock_guard<simple_spinlock> l(lock_);
   highest_txn_id_ = std::max(highest_txn_id, highest_txn_id_);
   txns_by_id_ = std::move(txns_by_id);

Reply via email to