This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit c6d438ab417009e8007a1de274178d0bcf0dfb63 Author: Alexey Serbin <[email protected]> AuthorDate: Fri Aug 7 20:51:04 2020 -0700 [tserver] add test to reproduce KUDU-1587 conditions Added a test to reproduce conditions described in KUDU-1587. As of now, the test is disabled: it will be enabled once KUDU-1587 is addressed. Change-Id: I515a1b26152680ee9b9361afcf84fec39b8f962d Reviewed-on: http://gerrit.cloudera.org:8080/16312 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/tserver/tablet_server-test.cc | 133 +++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc index 62de1d8..dcab726 100644 --- a/src/kudu/tserver/tablet_server-test.cc +++ b/src/kudu/tserver/tablet_server-test.cc @@ -73,6 +73,7 @@ #include "kudu/gutil/strings/escaping.h" #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/sysinfo.h" #include "kudu/rpc/messenger.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/rpc_header.pb.h" @@ -139,9 +140,11 @@ using kudu::tablet::Tablet; using kudu::tablet::TabletReplica; using kudu::tablet::TabletStatePB; using kudu::tablet::TabletSuperBlockPB; +using std::endl; using std::make_shared; using std::map; using std::pair; +using std::ostringstream; using std::set; using std::shared_ptr; using std::string; @@ -184,9 +187,11 @@ DECLARE_int32(maintenance_manager_num_threads); DECLARE_int32(maintenance_manager_polling_interval_ms); DECLARE_int32(memory_pressure_percentage); DECLARE_int32(metrics_retirement_age_ms); +DECLARE_int32(rpc_service_queue_length); DECLARE_int32(scanner_batch_size_rows); DECLARE_int32(scanner_gc_check_interval_us); DECLARE_int32(scanner_ttl_ms); +DECLARE_int32(tablet_inject_latency_on_apply_write_op_ms); DECLARE_int32(workload_stats_rate_collection_min_interval_ms); DECLARE_int32(workload_stats_metric_collection_interval_ms); DECLARE_string(block_manager); @@ -199,6 +204,8 @@ METRIC_DECLARE_counter(log_block_manager_holes_punched); METRIC_DECLARE_counter(rows_inserted); METRIC_DECLARE_counter(rows_updated); METRIC_DECLARE_counter(rows_deleted); +METRIC_DECLARE_counter(rpcs_queue_overflow); +METRIC_DECLARE_counter(rpcs_timed_out_in_queue); METRIC_DECLARE_counter(scanners_expired); METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management); METRIC_DECLARE_gauge_uint64(log_block_manager_containers); @@ -206,6 +213,9 @@ METRIC_DECLARE_gauge_size(active_scanners); METRIC_DECLARE_gauge_size(tablet_active_scanners); METRIC_DECLARE_gauge_size(num_rowsets_on_disk); METRIC_DECLARE_histogram(flush_dms_duration); +METRIC_DECLARE_histogram(op_apply_queue_length); +METRIC_DECLARE_histogram(op_apply_queue_time); + namespace kudu { @@ -4351,5 +4361,128 @@ TEST_F(TabletServerTest, TestStarvePerfImprovementOpsInColdTablet) { }); } +class OpApplyQueueTest : public TabletServerTestBase { + public: + // Starts the tablet server, override to start it later. + void SetUp() override { + // Since scenarios of this test make bursts of requests, set the maximum + // length of the service queue to accomodate many requests. + FLAGS_rpc_service_queue_length = 1000; + NO_FATALS(TabletServerTestBase::SetUp()); + NO_FATALS(StartTabletServer(/*num_data_dirs=*/1)); + } + + static constexpr const int32_t kInjectedLatencyMs = 100; +}; + +// This is a regression test for KUDU-1587. +// TODO(aserbin): enable the test once KUDU-1587 is addressed. +TEST_F(OpApplyQueueTest, DISABLED_ApplyQueueBackpressure) { + SKIP_IF_SLOW_NOT_ALLOWED(); + + constexpr size_t kNumCalls = 1000; + const int num_cpus = base::NumCPUs(); + WriteRequestPB req; + req.set_tablet_id(kTabletId); + ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); + + // Inject latency into WriteOp::Apply(). + FLAGS_tablet_inject_latency_on_apply_write_op_ms = kInjectedLatencyMs; + + // Send many calls to tablet server, not awaiting for responses. + // After sending the first chunk, wait for some time to allow for propagating + // the operations to the apply queue and to have apply queue times growing. + vector<unique_ptr<RpcController>> controllers; + vector<unique_ptr<WriteResponsePB>> responses; + CountDownLatch latch(kNumCalls); + for (size_t idx = 0; idx < kNumCalls; ++idx) { + controllers.emplace_back(new RpcController); + responses.emplace_back(new WriteResponsePB); + + req.clear_row_operations(); + auto* data = req.mutable_row_operations(); + AddTestRowWithNullableStringToPB( + RowOperationsPB::INSERT, schema_, idx, idx, nullptr, data); + proxy_->AsyncRequest("Write", req, responses.back().get(), controllers.back().get(), + [&latch]() { latch.CountDown(); }); + if (idx == 10 * num_cpus) { + // Allow to realize what current queue times are once ops reached + // the apply queue. + SleepFor(MonoDelta::FromMilliseconds(2 * kInjectedLatencyMs)); + } + } + + // Wait for calls to be processed before capturing the apply queue stats. + latch.Wait(); + + size_t num_ok = 0; + size_t num_error = 0; + for (const auto& ctl : controllers) { + if (ctl->status().ok()) { + ++num_ok; + } else { + ++num_error; + } + } + ASSERT_EQ(kNumCalls, num_ok + num_error); + + // Not all request should succeed -- due to long apply times, some should be + // rejected. + EXPECT_GT(num_error, 0); + + { + // No RPC queue overflows are expected. + auto rpc_queue_overflows = METRIC_rpcs_queue_overflow.Instantiate( + mini_server_->server()->metric_entity()); + ASSERT_EQ(0, rpc_queue_overflows->value()); + } + + { + // No requests should timeout while in the queue. + auto timed_out_in_rpc_queue = METRIC_rpcs_timed_out_in_queue.Instantiate( + mini_server_->server()->metric_entity()); + ASSERT_EQ(0, timed_out_in_rpc_queue->value()); + } + + { + // Some calls should be rejected due to overloaded apply queue, + // so the corresponding queue times should not get too close to the value of + // total_request * injected_latency / number_of_apply_threads. + auto qt = METRIC_op_apply_queue_time.Instantiate( + mini_server_->server()->metric_entity()); + ostringstream ostr; + ostr << qt->prototype()->name() << ":" << endl; + const auto* h = qt->histogram(); + h->DumpHumanReadable(&ostr); + LOG(INFO) << ostr.str(); + + // These are simple heuristics rather than exact theoretical thresholds. + // They depend on the injected latency and the 'overloaded' threshold + // for the apply queue. + EXPECT_LT(h->MaxValue(), + kInjectedLatencyMs * 1000 * kNumCalls * 4 / (4 * num_cpus)); + EXPECT_LT(h->ValueAtPercentile(99), + kInjectedLatencyMs * 1000 * kNumCalls * 3 / (4 * num_cpus)); + EXPECT_LT(h->ValueAtPercentile(75), + kInjectedLatencyMs * 1000 * kNumCalls * 2 / (4 * num_cpus)); + } + + { + // With current apply latency, the queue should not get too long. + auto ql = METRIC_op_apply_queue_length.Instantiate( + mini_server_->server()->metric_entity()); + ostringstream ostr; + ostr << ql->prototype()->name() << ":" << endl; + const auto* h = ql->histogram(); + h->DumpHumanReadable(&ostr); + LOG(INFO) << ostr.str(); + + // These are simple heuristics as well. They depend on the injected latency + // and the 'overloaded' threshold for the apply queue. + EXPECT_LT(h->MaxValue(), 3 * kNumCalls / 4); + EXPECT_LT(h->MeanValue(), kNumCalls / 2); + } +} + } // namespace tserver } // namespace kudu
