This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6c049687f KUDU-3500 don't start operations timed out in prepare queue
6c049687f is described below
commit 6c049687f60e90cbdac6f6ec039a528d13664a6b
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Jul 19 22:02:36 2023 -0700
KUDU-3500 don't start operations timed out in prepare queue
While troubleshooting a performance issue where the prepare queue for
a tablet was very long, I noticed that tablet servers start write
operations that correspond to RPCs that have already timed out. Most
likely, the client that sent the RPC had already detected the timeout
and expected that the write would have failed already.
As a simple optimization, this patch updates the logic of the OpDriver
class to respond with TimedOut error status right away when a write
operation that has already timed out while waiting in the prepare queue
is dispatched to the prepare thread. That helps with clearing the queue
and processing not-yet-timed-out requests from the queue faster,
increasing the overall robustness of a tablet server when the load
is high and the node's CPU and disk IO bandwidth are saturated.
A new tablet metric 'ops_timed_out_in_prepare_queue' is introduced to
track the number of WriteRequestPB RPCs timed out in the tablet's prepare
queue and responded with TimedOut error status even before starting
the PREPARE phase for the corresponding operation.
This patch also adds a new test to cover the new functionality.
Change-Id: I202ce6b5e425439b50c0751d7f7406e69b8e751a
Reviewed-on: http://gerrit.cloudera.org:8080/20300
Tested-by: Kudu Jenkins
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/tablet/ops/op_driver.cc | 19 +++++--
src/kudu/tablet/ops/op_driver.h | 7 ++-
src/kudu/tablet/ops/write_op.cc | 12 +++++
src/kudu/tablet/tablet_metrics.cc | 10 ++++
src/kudu/tablet/tablet_metrics.h | 1 +
src/kudu/tablet/tablet_replica-test.cc | 3 +-
src/kudu/tablet/tablet_replica.cc | 18 ++++---
src/kudu/tablet/tablet_replica.h | 20 ++++---
src/kudu/tablet/txn_participant-test.cc | 3 +-
src/kudu/tserver/tablet_server-test.cc | 93 +++++++++++++++++++++++++++++++++
src/kudu/tserver/tablet_service.cc | 5 +-
11 files changed, 170 insertions(+), 21 deletions(-)
diff --git a/src/kudu/tablet/ops/op_driver.cc b/src/kudu/tablet/ops/op_driver.cc
index 73a7f7f21..b3e1aa740 100644
--- a/src/kudu/tablet/ops/op_driver.cc
+++ b/src/kudu/tablet/ops/op_driver.cc
@@ -43,9 +43,11 @@
#include "kudu/tablet/op_order_verifier.h"
#include "kudu/tablet/ops/op_tracker.h"
#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status_callback.h"
#include "kudu/util/threadpool.h"
@@ -107,7 +109,8 @@ OpDriver::OpDriver(OpTracker *op_tracker,
Log* log,
ThreadPoolToken* prepare_pool_token,
ThreadPool* apply_pool,
- OpOrderVerifier* order_verifier)
+ OpOrderVerifier* order_verifier,
+ MonoTime deadline)
: op_tracker_(op_tracker),
consensus_(consensus),
log_(log),
@@ -115,6 +118,7 @@ OpDriver::OpDriver(OpTracker *op_tracker,
apply_pool_(apply_pool),
order_verifier_(order_verifier),
trace_(new Trace()),
+ deadline_(deadline),
start_time_(MonoTime::Now()),
replication_state_(NOT_REPLICATING),
prepare_state_(NOT_PREPARED) {
@@ -233,9 +237,18 @@ void OpDriver::ExecuteAsync() {
void OpDriver::PrepareTask() {
TRACE_EVENT_FLOW_END0("op", "PrepareTask", this);
- Status prepare_status = Prepare();
+ if (PREDICT_FALSE(deadline_ <= MonoTime::Now())) {
+ static const Status kTimedOut = Status::TimedOut(
+ "operation timed out while waiting in prepare queue");
+ if (auto* m = op_->state()->tablet_replica()->tablet()->metrics();
+ PREDICT_TRUE(m)) {
+ m->ops_timed_out_in_prepare_queue->Increment();
+ }
+ return HandleFailure(kTimedOut);
+ }
+ const auto prepare_status = Prepare();
if (PREDICT_FALSE(!prepare_status.ok())) {
- HandleFailure(prepare_status);
+ return HandleFailure(prepare_status);
}
}
diff --git a/src/kudu/tablet/ops/op_driver.h b/src/kudu/tablet/ops/op_driver.h
index 8b58c119f..9fba85303 100644
--- a/src/kudu/tablet/ops/op_driver.h
+++ b/src/kudu/tablet/ops/op_driver.h
@@ -229,7 +229,8 @@ class OpDriver : public RefCountedThreadSafe<OpDriver> {
log::Log* log,
ThreadPoolToken* prepare_pool_token,
ThreadPool* apply_pool,
- OpOrderVerifier* order_verifier);
+ OpOrderVerifier* order_verifier,
+ MonoTime deadline = MonoTime::Max());
// Perform any non-constructor initialization. Sets the op that will be
// executed.
@@ -378,6 +379,10 @@ class OpDriver : public RefCountedThreadSafe<OpDriver> {
// Trace object for tracing any ops started by this driver.
scoped_refptr<Trace> trace_;
+ // Deadline for the operation, if any. It's propagated from the corresponding
+ // client RPC, and set to MonoTime::Max() if no deadline has been specified.
+ const MonoTime deadline_;
+
const MonoTime start_time_;
MonoTime replication_start_time_;
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index 343da89f7..208f33c07 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -72,6 +72,12 @@ DEFINE_int32(tablet_inject_latency_on_apply_write_op_ms, 0,
TAG_FLAG(tablet_inject_latency_on_apply_write_op_ms, unsafe);
TAG_FLAG(tablet_inject_latency_on_apply_write_op_ms, runtime);
+DEFINE_int32(tablet_inject_latency_on_prepare_write_op_ms, 0,
+ "How much latency to inject when a write op is prepared. "
+ "For testing only!");
+TAG_FLAG(tablet_inject_latency_on_prepare_write_op_ms, unsafe);
+TAG_FLAG(tablet_inject_latency_on_prepare_write_op_ms, runtime);
+
DECLARE_bool(enable_txn_partition_lock);
using std::optional;
@@ -165,6 +171,12 @@ void WriteOp::NewReplicateMsg(unique_ptr<ReplicateMsg>*
replicate_msg) {
Status WriteOp::Prepare() {
TRACE_EVENT0("op", "WriteOp::Prepare");
+ if (PREDICT_FALSE(FLAGS_tablet_inject_latency_on_prepare_write_op_ms) > 0) {
+ TRACE("injecting $0ms of latency due to
--tablet_inject_latency_on_prepare_write_op_ms",
+ FLAGS_tablet_inject_latency_on_prepare_write_op_ms);
+
SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_inject_latency_on_prepare_write_op_ms));
+ }
+
Tablet* tablet = state()->tablet_replica()->tablet();
TRACE(Substitute("PREPARE: starting on tablet $0", tablet->tablet_id()));
// Decode everything first so that we give up if something major is wrong.
diff --git a/src/kudu/tablet/tablet_metrics.cc
b/src/kudu/tablet/tablet_metrics.cc
index ce94985af..71a405490 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -188,6 +188,15 @@ METRIC_DEFINE_counter(tablet,
deleted_rowset_gc_bytes_deleted,
"Number of bytes deleted by garbage-collecting deleted
rowsets.",
kudu::MetricLevel::kDebug);
+METRIC_DEFINE_counter(tablet, ops_timed_out_in_prepare_queue,
+ "Number of Requests Timed Out In Prepare Queue",
+ kudu::MetricUnit::kRequests,
+ "Number of WriteRequest RPCs that timed out while their "
+ "corresponding operations were waiting in the tablet's "
+ "prepare queue, and thus were not started but "
+ "acknowledged with TimedOut error status.",
+ kudu::MetricLevel::kInfo);
+
METRIC_DEFINE_histogram(tablet, bloom_lookups_per_op, "Bloom Lookups per
Operation",
kudu::MetricUnit::kProbes,
"Tracks the number of bloom filter lookups performed
by each "
@@ -424,6 +433,7 @@ TabletMetrics::TabletMetrics(const
scoped_refptr<MetricEntity>& entity)
MINIT(bytes_flushed),
MINIT(deleted_rowset_gc_bytes_deleted),
MINIT(undo_delta_block_gc_bytes_deleted),
+ MINIT(ops_timed_out_in_prepare_queue),
MINIT(bloom_lookups_per_op),
MINIT(key_file_lookups_per_op),
MINIT(delta_file_lookups_per_op),
diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h
index b277d4284..d8d030bae 100644
--- a/src/kudu/tablet/tablet_metrics.h
+++ b/src/kudu/tablet/tablet_metrics.h
@@ -76,6 +76,7 @@ struct TabletMetrics {
scoped_refptr<Counter> bytes_flushed;
scoped_refptr<Counter> deleted_rowset_gc_bytes_deleted;
scoped_refptr<Counter> undo_delta_block_gc_bytes_deleted;
+ scoped_refptr<Counter> ops_timed_out_in_prepare_queue;
scoped_refptr<Histogram> bloom_lookups_per_op;
scoped_refptr<Histogram> key_file_lookups_per_op;
diff --git a/src/kudu/tablet/tablet_replica-test.cc
b/src/kudu/tablet/tablet_replica-test.cc
index bd6670ce9..998b27b2f 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -431,7 +431,8 @@ TEST_F(TabletReplicaTest, TestActiveOpPreventsLogGC) {
scoped_refptr<OpDriver> driver;
ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op),
- &driver));
+ &driver,
+ MonoTime::Max()));
driver->ExecuteAsync();
apply_started.Wait();
ASSERT_TRUE(driver->GetOpId().IsInitialized())
diff --git a/src/kudu/tablet/tablet_replica.cc
b/src/kudu/tablet/tablet_replica.cc
index a7119c7ac..a5fe99a5d 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -591,13 +591,14 @@ Status TabletReplica::UnregisterTxnOpDispatcher(int64_t
txn_id,
return unregister_status;
}
-Status TabletReplica::SubmitWrite(unique_ptr<WriteOpState> op_state) {
+Status TabletReplica::SubmitWrite(unique_ptr<WriteOpState> op_state,
+ MonoTime deadline) {
RETURN_NOT_OK(CheckRunning());
op_state->SetResultTracker(result_tracker_);
unique_ptr<WriteOp> op(new WriteOp(std::move(op_state), consensus::LEADER));
scoped_refptr<OpDriver> driver;
- RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver));
+ RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, deadline));
driver->ExecuteAsync();
return Status::OK();
}
@@ -608,18 +609,19 @@ Status
TabletReplica::SubmitTxnParticipantOp(std::unique_ptr<ParticipantOpState>
op_state->SetResultTracker(result_tracker_);
unique_ptr<ParticipantOp> op(new ParticipantOp(std::move(op_state),
consensus::LEADER));
scoped_refptr<OpDriver> driver;
- RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver));
+ RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, MonoTime::Max()));
driver->ExecuteAsync();
return Status::OK();
}
-Status TabletReplica::SubmitAlterSchema(unique_ptr<AlterSchemaOpState> state) {
+Status TabletReplica::SubmitAlterSchema(unique_ptr<AlterSchemaOpState> state,
+ MonoTime deadline) {
RETURN_NOT_OK(CheckRunning());
unique_ptr<AlterSchemaOp> op(
new AlterSchemaOp(std::move(state), consensus::LEADER));
scoped_refptr<OpDriver> driver;
- RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver));
+ RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, deadline));
driver->ExecuteAsync();
return Status::OK();
}
@@ -897,14 +899,16 @@ void
TabletReplica::FinishConsensusOnlyRound(ConsensusRound* round) {
}
Status TabletReplica::NewLeaderOpDriver(unique_ptr<Op> op,
- scoped_refptr<OpDriver>* driver) {
+ scoped_refptr<OpDriver>* driver,
+ MonoTime deadline) {
scoped_refptr<OpDriver> op_driver = new OpDriver(
&op_tracker_,
consensus_.get(),
log_.get(),
prepare_pool_token_.get(),
apply_pool_,
- &op_order_verifier_);
+ &op_order_verifier_,
+ deadline);
RETURN_NOT_OK(op_driver->Init(std::move(op), consensus::LEADER));
*driver = std::move(op_driver);
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 09985a822..d44f47429 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -46,6 +46,7 @@
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
@@ -53,7 +54,6 @@ class AlterTableTest;
class DnsResolver;
class MaintenanceManager;
class MaintenanceOp;
-class MonoDelta;
class ThreadPool;
class ThreadPoolToken;
class TxnOpDispatcherITest;
@@ -180,8 +180,10 @@ class TabletReplica : public
RefCountedThreadSafe<TabletReplica>,
// Submits a write to a tablet and executes it asynchronously.
// The caller is expected to build and pass a WriteOpState that points to the
- // RPC's WriteRequest, and WriteResponse.
- Status SubmitWrite(std::unique_ptr<WriteOpState> op_state);
+ // RPC's WriteRequest, and WriteResponse. The 'deadline' parameter
corresponds
+ // to the timeout of the corresponding RPC, if present/specified.
+ Status SubmitWrite(std::unique_ptr<WriteOpState> op_state,
+ MonoTime deadline = MonoTime::Max());
// Submits an op to update transaction participant state, executing it
// asynchonously.
@@ -190,7 +192,8 @@ class TabletReplica : public
RefCountedThreadSafe<TabletReplica>,
// Called by the tablet service to start an alter schema op.
//
// The op contains all the information required to execute the
- // AlterSchema operation and send the response back.
+ // AlterSchema operation and send the response back. The 'deadline' parameter
+ // corresponds to the timeout of the corresponding RPC, if present/specified.
//
// If the returned Status is OK, the response to the client will be sent
// asynchronously. Otherwise the tablet service will have to send the
response directly.
@@ -198,7 +201,8 @@ class TabletReplica : public
RefCountedThreadSafe<TabletReplica>,
// The AlterSchema operation is taking the tablet component lock in
exclusive mode
// meaning that no other operation on the tablet can be executed while the
// AlterSchema is in progress.
- Status SubmitAlterSchema(std::unique_ptr<AlterSchemaOpState> op_state);
+ Status SubmitAlterSchema(std::unique_ptr<AlterSchemaOpState> op_state,
+ MonoTime deadline = MonoTime::Max());
void GetTabletStatusPB(TabletStatusPB* status_pb_out) const;
@@ -308,8 +312,12 @@ class TabletReplica : public
RefCountedThreadSafe<TabletReplica>,
// Convenience method to return the permanent_uuid of this peer.
std::string permanent_uuid() const { return
tablet_->metadata()->fs_manager()->uuid(); }
+ // The 'deadline' parameter is used to short-cuircut processing operations
+ // that have timed out while in the prepare queue. For most call sites,
+ // the deadline is naturally defined by the corresponding RPC.
Status NewLeaderOpDriver(std::unique_ptr<Op> op,
- scoped_refptr<OpDriver>* driver);
+ scoped_refptr<OpDriver>* driver,
+ MonoTime deadline);
Status NewReplicaOpDriver(std::unique_ptr<Op> op,
scoped_refptr<OpDriver>* driver);
diff --git a/src/kudu/tablet/txn_participant-test.cc
b/src/kudu/tablet/txn_participant-test.cc
index 1413ebde1..920fb0039 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -69,6 +69,7 @@
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -999,7 +1000,7 @@ TEST_F(TxnParticipantTest,
TestActiveParticipantOpsAnchorWALs) {
scoped_refptr<OpDriver> driver;
unique_ptr<DelayedParticipantOp> op(
new DelayedParticipantOp(&apply_start, &apply_continue,
std::move(op_state)));
- ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver));
+ ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver,
MonoTime::Max()));
driver->ExecuteAsync();
// Wait for the apply to start, indicating that we have persisted and
// replicated but not yet Raft committed the participant op.
diff --git a/src/kudu/tserver/tablet_server-test.cc
b/src/kudu/tserver/tablet_server-test.cc
index 54bffc3cc..5ee566ca1 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -200,6 +200,7 @@ DECLARE_int32(maintenance_manager_num_threads);
DECLARE_int32(maintenance_manager_polling_interval_ms);
DECLARE_int32(memory_pressure_percentage);
DECLARE_int32(metrics_retirement_age_ms);
+DECLARE_int32(rpc_num_service_threads);
DECLARE_int32(rpc_service_queue_length);
DECLARE_int32(scanner_batch_size_rows);
DECLARE_int32(scanner_gc_check_interval_us);
@@ -208,6 +209,7 @@ DECLARE_int32(scanner_ttl_ms);
DECLARE_int32(slow_scanner_threshold_ms);
DECLARE_int32(tablet_bootstrap_inject_latency_ms);
DECLARE_int32(tablet_inject_latency_on_apply_write_op_ms);
+DECLARE_int32(tablet_inject_latency_on_prepare_write_op_ms);
DECLARE_int32(workload_stats_rate_collection_min_interval_ms);
DECLARE_int32(workload_stats_metric_collection_interval_ms);
DECLARE_string(block_manager);
@@ -219,6 +221,7 @@ DECLARE_uint32(tablet_apply_pool_overload_threshold_ms);
// Declare these metrics prototypes for simpler unit testing of their behavior.
METRIC_DECLARE_counter(block_manager_total_bytes_read);
METRIC_DECLARE_counter(log_block_manager_holes_punched);
+METRIC_DECLARE_counter(ops_timed_out_in_prepare_queue);
METRIC_DECLARE_counter(rows_inserted);
METRIC_DECLARE_counter(rows_updated);
METRIC_DECLARE_counter(rows_deleted);
@@ -5049,5 +5052,95 @@ TEST_F(OpApplyQueueTest, ApplyQueueBackpressure) {
}
}
+class OpPrepareQueueTest : public TabletServerTestBase {
+ public:
+ void SetUp() override {
+ // To have the expected sequence of hanling requests sent out in
+ // asynchronous manner regardless of any scheduler anomalies, run just
+ // a single thread in the service pool.
+ FLAGS_rpc_num_service_threads = 1;
+
+ NO_FATALS(TabletServerTestBase::SetUp());
+ NO_FATALS(StartTabletServer(/*num_data_dirs=*/1));
+ }
+};
+
+// This test verifies that write requests that time out in the prepare queue
+// aren't processed any further, but responded with TimedOut error status
+// even before starting the PREPARE phase of the operation processing.
+TEST_F(OpPrepareQueueTest, RequestTimesOutInPrepareQueue) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ constexpr const int32_t kLatencyMs = 3000;
+
+ // Instantiate metrics to be used in this scenario.
+ scoped_refptr<TabletReplica> tablet;
+
ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId,
&tablet));
+ const auto& metric_entity = tablet->tablet()->GetMetricEntity();
+ ASSERT_TRUE(metric_entity);
+ scoped_refptr<Counter> timed_out_ops_num =
+ METRIC_ops_timed_out_in_prepare_queue.Instantiate(metric_entity);
+
+ // Inject latency into the PREPARE phase of every write operation.
+ FLAGS_tablet_inject_latency_on_prepare_write_op_ms = kLatencyMs;
+
+ CountDownLatch latch(2);
+ auto cbk = [&latch]() {
+ latch.CountDown();
+ };
+
+ WriteRequestPB req0;
+ req0.set_tablet_id(kTabletId);
+ ASSERT_OK(SchemaToPB(schema_, req0.mutable_schema()));
+ AddTestRowToPB(RowOperationsPB::INSERT, schema_, 0, 0, "req0",
+ req0.mutable_row_operations());
+
+ // Set timeout to be longer than the injected delay to make sure the first
+ // write request doesn't time out.
+ RpcController ctl0;
+ ctl0.set_timeout(MonoDelta::FromMilliseconds(2 * kLatencyMs));
+ WriteResponsePB resp0;
+ proxy_->WriteAsync(req0, &resp0, &ctl0, cbk);
+
+ // Make sure no operations has timed out yet in the queue.
+ ASSERT_EQ(0, timed_out_ops_num->value());
+
+ // Send another write request to a tablet server. Due to the injected
latency,
+ // it takes a long time to process the first request, so the second request
+ // will be waiting in the prepare queue since there is a just single thread
+ // in the tablet's prepare thread pool.
+ WriteRequestPB req1;
+ req1.set_tablet_id(kTabletId);
+ ASSERT_OK(SchemaToPB(schema_, req1.mutable_schema()));
+ AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1, 1, "req1",
+ req1.mutable_row_operations());
+
+ // Set timeout to be less than the injected latency to make sure the request
+ // would time out in the prepare queue.
+ RpcController ctl1;
+ ctl1.set_timeout(MonoDelta::FromMilliseconds(kLatencyMs / 2));
+ WriteResponsePB resp1;
+ proxy_->WriteAsync(req1, &resp1, &ctl1, cbk);
+
+ // Wait until both the requests have been responded to.
+ latch.Wait();
+
+ // There should be a single operation timed out in the prepare queue: that's
+ // the operation corresponding to 'req1'.
+ ASSERT_EQ(1, timed_out_ops_num->value());
+
+ // The first request should succeed since its timeout is higher than
+ // the injected latency.
+ ASSERT_FALSE(resp0.has_error());
+ ASSERT_OK(ctl0.status());
+
+ // The second request should time out, and the metrics readings above prove
+ // it has been timed out in the prepare queue, so the corresponding operation
+ // hasn't been started.
+ ASSERT_FALSE(resp1.has_error());
+ const auto& s = ctl1.status();
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+}
+
} // namespace tserver
} // namespace kudu
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index a7f50f6cb..c520bda38 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1209,7 +1209,8 @@ void TabletServiceAdminImpl::AlterSchema(const
AlterSchemaRequestPB* req,
new RpcOpCompletionCallback<AlterSchemaResponsePB>(context, resp)));
// Submit the alter schema op. The RPC will be responded to asynchronously.
- Status s = replica->SubmitAlterSchema(std::move(op_state));
+ Status s = replica->SubmitAlterSchema(std::move(op_state),
+ context->GetClientDeadline());
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
@@ -1677,7 +1678,7 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
new RpcOpCompletionCallback<WriteResponsePB>(context, resp)));
// Submit the write operation. The RPC will be responded asynchronously.
- s = replica->SubmitWrite(std::move(op_state));
+ s = replica->SubmitWrite(std::move(op_state), deadline);
} else {
if (!FLAGS_enable_txn_system_client_init) {
return SetupErrorAndRespond(