This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f6f6c243971955d57493ae07d6b89e87f6400a82 Author: xinghuayu007 <[email protected]> AuthorDate: Tue Feb 7 15:34:36 2023 +0800 [KUDU-3447] Limit tablets copying speed Copying tablets from one cluster to another is a high resource consumed operation using the command : kudu local_replica copy_from_remote. If the data size is very large, the copying process will last for a long time. Other service maybe get impacted and become unavailable because of the tablets copying process cost too much disk and/or network bandwith. Therefore it is better to limit the tablets copying speed and make the system more stable. The goal is a trade-off the tablets copying speed and the resource consumption. As copy_from_remote is mainly downloading data from the remote cluster and writing the data to local file system, it is better to control the downloading speed to control the resource consumption. This patch use a throttler to limit tablet copying speed. Two paramters are added: --tablet_copy_throttler_bytes_per_sec limits the copying speed, and --tablet_copy_throttler_burst_factor limits the maximum copying speed at a single time. Change-Id: I1f4834bfb0718a2b6b1d946975287a11f6be1fe3 Reviewed-on: http://gerrit.cloudera.org:8080/19479 Reviewed-by: Yingchun Lai <[email protected]> Tested-by: Yingchun Lai <[email protected]> --- src/kudu/tools/kudu-tool-test.cc | 41 +++++++++++++++++++ src/kudu/tools/tool_action_local_replica.cc | 19 ++++++++- src/kudu/tserver/tablet_copy_client-test.cc | 63 +++++++++++++++++++++++++++-- src/kudu/tserver/tablet_copy_client.cc | 32 ++++++++++++--- src/kudu/tserver/tablet_copy_client.h | 12 +++++- 5 files changed, 155 insertions(+), 12 deletions(-) diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 0dd4a0db3..8dd5688c3 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -159,6 +159,7 @@ DECLARE_int32(catalog_manager_inject_latency_load_ca_info_ms); DECLARE_int32(flush_threshold_mb); DECLARE_int32(flush_threshold_secs); DECLARE_int32(heartbeat_interval_ms); +DECLARE_int32(tablet_copy_transfer_chunk_size_bytes); DECLARE_int32(tserver_unresponsive_timeout_ms); DECLARE_int32(rpc_negotiation_inject_delay_ms); DECLARE_string(block_manager); @@ -9350,6 +9351,46 @@ TEST_F(ToolTest, TestLocalReplicaCopyRemote) { } } +TEST_F(ToolTest, TestLocalReplicaCopyRemoteWithSpeedLimit) { + InternalMiniClusterOptions opts; + opts.num_tablet_servers = 2; + NO_FATALS(StartMiniCluster(std::move(opts))); + NO_FATALS(CreateTableWithFlushedData("table1", mini_cluster_.get(), 3, 1)); + auto source_tablet_server = mini_cluster_->mini_tablet_server(0); + int source_tserver_tablet_count = source_tablet_server->ListTablets().size(); + ASSERT_GT(source_tserver_tablet_count, 0); + auto target_tablet_server = mini_cluster_->mini_tablet_server(1); + int target_tserver_tablet_count_before = target_tablet_server->ListTablets().size(); + string tablet_ids_str = JoinStrings(source_tablet_server->ListTablets(), ","); + string source_tserver_rpc_addr = source_tablet_server->bound_rpc_addr().ToString(); + string wal_dir = target_tablet_server->options()->fs_opts.wal_root; + string data_dirs = JoinStrings(target_tablet_server->options()->fs_opts.data_roots, ","); + NO_FATALS(target_tablet_server->Shutdown()); + // Copy tablet replicas from tserver0 to tserver1. + string stderr; + NO_FATALS(RunActionStdoutStderrString( + Substitute("local_replica copy_from_remote $0 $1 " + "-fs_data_dirs=$2 -fs_wal_dir=$3 -num_threads=3 " + "-tablet_copy_throttler_bytes_per_sec=$4 " + "-tablet_copy_throttler_burst_factor=1", + tablet_ids_str, + source_tserver_rpc_addr, + data_dirs, + wal_dir, 10 * FLAGS_tablet_copy_transfer_chunk_size_bytes), + nullptr, &stderr)); + // To prove that throttler works, but it is hard to compute the copying speed to be limited + // under tablet_copy_throttler_bytes_per_sec. + ASSERT_STR_CONTAINS(stderr, "Time spent Tablet copy throttler"); + NO_FATALS(target_tablet_server->Start()); + vector<string> tids = target_tablet_server->ListTablets(); + unordered_set<string> target_tablet_ids(tids.begin(), tids.end()); + ASSERT_EQ(source_tserver_tablet_count + target_tserver_tablet_count_before, + target_tablet_ids.size()); + for (string tablet_id : source_tablet_server->ListTablets()) { + ASSERT_TRUE(ContainsKey(target_tablet_ids, tablet_id)); + } +} + class DownloadSuperblockInBatchTest : public ToolTest, diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc index 8d92780a3..122822619 100644 --- a/src/kudu/tools/tool_action_local_replica.cc +++ b/src/kudu/tools/tool_action_local_replica.cc @@ -95,6 +95,7 @@ #include "kudu/util/status.h" #include "kudu/util/thread.h" #include "kudu/util/threadpool.h" +#include "kudu/util/throttler.h" // IWYU pragma: no_include <boost/container/vector.hpp> namespace kudu { @@ -159,6 +160,8 @@ DEFINE_string(dst_fs_metadata_dir, "", DECLARE_int32(num_threads); DECLARE_bool(tablet_copy_support_download_superblock_in_batch); DECLARE_int32(tablet_copy_download_threads_nums_per_session); +DECLARE_int64(tablet_copy_throttler_bytes_per_sec); +DECLARE_double(tablet_copy_throttler_burst_factor); DECLARE_string(tables); using kudu::consensus::ConsensusMetadata; @@ -327,6 +330,15 @@ class TabletCopier { shared_ptr<Messenger> messenger; RETURN_NOT_OK(BuildMessenger("tablet_copy_client", &messenger)); + + shared_ptr<Throttler> throttler; + if (FLAGS_tablet_copy_throttler_bytes_per_sec > 0) { + throttler = std::make_shared<Throttler>(MonoTime::Now(), + 0, + FLAGS_tablet_copy_throttler_bytes_per_sec, + FLAGS_tablet_copy_throttler_burst_factor); + } + // Start to copy tablets. for (const auto& tablet_id : tablet_ids_to_copy_) { RETURN_NOT_OK(copy_pool->Submit([&]() { @@ -340,8 +352,9 @@ class TabletCopier { Status s; unique_ptr<TabletCopyClient> client; if (copy_type_ == CopyType::FROM_REMOTE) { - client.reset(new RemoteTabletCopyClient(tablet_id, dst_fs_manager_, dst_cmeta_manager_, - messenger, &tablet_copy_client_metrics)); + client.reset(new RemoteTabletCopyClient( + tablet_id, dst_fs_manager_, dst_cmeta_manager_, + messenger, &tablet_copy_client_metrics, throttler)); s = client->Start(source_addr_, nullptr); } else { CHECK_EQ(copy_type_, CopyType::FROM_LOCAL); @@ -1370,6 +1383,8 @@ unique_ptr<Mode> BuildLocalReplicaMode() { .AddOptionalParameter("tablet_copy_download_threads_nums_per_session") .AddOptionalParameter("num_threads") .AddOptionalParameter("tablet_copy_support_download_superblock_in_batch") + .AddOptionalParameter("tablet_copy_throttler_bytes_per_sec") + .AddOptionalParameter("tablet_copy_throttler_burst_factor") .Build(); unique_ptr<Action> copy_from_local = diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc index e938a9ee4..a53f2aa31 100644 --- a/src/kudu/tserver/tablet_copy_client-test.cc +++ b/src/kudu/tserver/tablet_copy_client-test.cc @@ -75,11 +75,13 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" +#include "kudu/util/throttler.h" DECLARE_double(env_inject_eio); DECLARE_double(tablet_copy_fault_crash_during_download_block); DECLARE_double(tablet_copy_fault_crash_during_download_wal); DECLARE_int32(tablet_copy_download_threads_nums_per_session); +DECLARE_int32(tablet_copy_transfer_chunk_size_bytes); DECLARE_string(block_manager); DECLARE_string(env_inject_eio_globs); @@ -188,7 +190,8 @@ class TabletCopyClientTest : public TabletCopyTest { FRIEND_TEST(TabletCopyClientBasicTest, TestSupportsLiveRowCount); Status CompareFileContents(const string& path1, const string& path2); - Status ResetRemoteTabletCopyClient(); + Status ResetRemoteTabletCopyClient( + TabletCopyClientMetrics* tablet_copy_client_metrics = nullptr); Status ResetLocalTabletCopyClient(); // Injection of 'supports_live_row_count' modifiers. @@ -212,6 +215,7 @@ class TabletCopyClientTest : public TabletCopyTest { unique_ptr<FsManager> src_fs_manager_; MetricRegistry src_metric_registry_; scoped_refptr<MetricEntity> src_metric_entity_; + std::shared_ptr<Throttler> throttler_; }; Status TabletCopyClientTest::CompareFileContents(const string& path1, const string& path2) { @@ -247,7 +251,8 @@ Status TabletCopyClientTest::CompareFileContents(const string& path1, const stri return Status::OK(); } -Status TabletCopyClientTest::ResetRemoteTabletCopyClient() { +Status TabletCopyClientTest::ResetRemoteTabletCopyClient( + TabletCopyClientMetrics* tablet_copy_client_metrics) { scoped_refptr<ConsensusMetadataManager> cmeta_manager( new ConsensusMetadataManager(fs_manager_.get())); @@ -257,7 +262,9 @@ Status TabletCopyClientTest::ResetRemoteTabletCopyClient() { fs_manager_.get(), cmeta_manager, messenger_, - nullptr /* no metrics */)); + tablet_copy_client_metrics, + throttler_)); + RaftPeerPB* cstate_leader; ConsensusStatePB cstate; RETURN_NOT_OK(tablet_replica_->consensus()->ConsensusState(&cstate)); @@ -307,6 +314,56 @@ Status TabletCopyClientTest::ResetLocalTabletCopyClient() { return Status::OK(); } +class TabletCopyThrottlerTest : public TabletCopyClientTest { + public: + TabletCopyThrottlerTest() { + mode_ = TabletCopyMode::REMOTE; + throttler_ = std::make_shared<Throttler>( + MonoTime::Now(), + 0, + FLAGS_tablet_copy_transfer_chunk_size_bytes, + 2 * FLAGS_tablet_copy_transfer_chunk_size_bytes); + } + + void SetUp() override { + TabletCopyClientTest::SetUp(); + } +}; + +TEST_F(TabletCopyThrottlerTest, TestThrottler) { + scoped_refptr<MetricEntity> src_metric_entity_( + METRIC_ENTITY_server.Instantiate(&metric_registry_, "tablet-copy-test")); + TabletCopyClientMetrics tablet_copy_client_metrics(src_metric_entity_); + ASSERT_OK(ResetRemoteTabletCopyClient(&tablet_copy_client_metrics)); + + ASSERT_OK(StartCopy()); + BlockId block_id = FirstColumnBlockId(*client_->remote_superblock_); + Slice slice; + faststring scratch; + + // Ensure the block wasn't there before (it shouldn't be, we use our own FsManager dir). + Status s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice); + ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString(); + + // Check that the client downloaded the block and verification passed. + BlockId new_block_id; + MonoTime start_time = MonoTime::Now(); + ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id)); + MonoTime end_time = MonoTime::Now(); + // Compute the real tablet downloading speed. + double download_speed = tablet_copy_client_metrics.bytes_fetched->value() / + (end_time - start_time).ToSeconds(); + // Real tablet downloading speed must be less than the defined speed. + ASSERT_GE(FLAGS_tablet_copy_transfer_chunk_size_bytes, download_speed); + ASSERT_OK(client_->transaction_->CommitCreatedBlocks()); + // Ensure it placed the block where we expected it to. + ASSERT_OK(ReadLocalBlockFile(fs_manager_.get(), new_block_id, &scratch, &slice)); + // 'client_' must be destroyed before 'tablet_copy_client_metrics', because client + // holds the pointer of 'tablet_copy_client_metrics', and uses 'tablet_copy_client_metrics' + // while being destroyed. See tablet_copy_client.cc. + client_.reset(); +} + class TabletCopyClientBasicTest : public TabletCopyClientTest, public ::testing::WithParamInterface<TabletCopyMode> { public: diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc index 0edd765f4..c9b949027 100644 --- a/src/kudu/tserver/tablet_copy_client.cc +++ b/src/kudu/tserver/tablet_copy_client.cc @@ -17,6 +17,7 @@ #include "kudu/tserver/tablet_copy_client.h" +#include <algorithm> #include <cstdint> #include <functional> #include <memory> @@ -24,7 +25,6 @@ #include <optional> #include <ostream> #include <type_traits> -#include <utility> #include <gflags/gflags.h> #include <glog/logging.h> @@ -75,9 +75,20 @@ #include "kudu/util/scoped_cleanup.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" +#include "kudu/util/stopwatch.h" #include "kudu/util/threadpool.h" +#include "kudu/util/throttler.h" // IWYU pragma: keep #include "kudu/util/trace.h" +DEFINE_int64(tablet_copy_throttler_bytes_per_sec, 0, + "Limit tablet copying speed. It limits the copying speed of all the tablets " + "in one server for one session. The default value is 0, which means not limiting " + "the speed. The unit is bytes/seconds"); +DEFINE_double(tablet_copy_throttler_burst_factor, 1.0f, + "Burst factor for tablet copy throttling. The maximum rate the throttler " + "allows within a token refill period (100ms) equals burst factor multiply " + "base rate. The default value is 1.0, which means the maximun rate is equals " + "to --tablet_copy_throttler_bytes_per_sec."); DEFINE_int32(tablet_copy_begin_session_timeout_ms, 30000, "Tablet server RPC client timeout for BeginTabletCopySession calls. " "Also used for EndTabletCopySession calls."); @@ -203,7 +214,8 @@ TabletCopyClient::TabletCopyClient( FsManager* dst_fs_manager, scoped_refptr<ConsensusMetadataManager> cmeta_manager, shared_ptr<Messenger> messenger, - TabletCopyClientMetrics* dst_tablet_copy_metrics) + TabletCopyClientMetrics* dst_tablet_copy_metrics, + shared_ptr<Throttler> throttler) : tablet_id_(std::move(tablet_id)), dst_fs_manager_(dst_fs_manager), cmeta_manager_(std::move(cmeta_manager)), @@ -213,7 +225,8 @@ TabletCopyClient::TabletCopyClient( session_idle_timeout_millis_(FLAGS_tablet_copy_begin_session_timeout_ms), start_time_micros_(0), rng_(GetRandomSeed32()), - dst_tablet_copy_metrics_(dst_tablet_copy_metrics) { + dst_tablet_copy_metrics_(dst_tablet_copy_metrics), + throttler_(std::move(throttler)) { auto bm = dst_fs_manager->block_manager(); transaction_ = bm->NewCreationTransaction(); if (dst_tablet_copy_metrics_) { @@ -997,6 +1010,13 @@ Status RemoteTabletCopyClient::DownloadFile(const DataIdPB& data_id, if (dst_tablet_copy_metrics_) { dst_tablet_copy_metrics_->bytes_fetched->IncrementBy(chunk_size); } + if (throttler_) { + LOG_TIMING(INFO, "Tablet copy throttler") { + while (!throttler_->Take(MonoTime::Now(), 0, chunk_size)) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } + } + } } return Status::OK(); @@ -1072,12 +1092,14 @@ RemoteTabletCopyClient::RemoteTabletCopyClient( FsManager* dst_fs_manager, scoped_refptr<ConsensusMetadataManager> cmeta_manager, shared_ptr<Messenger> messenger, - TabletCopyClientMetrics* dst_tablet_copy_metrics) + TabletCopyClientMetrics* dst_tablet_copy_metrics, + shared_ptr<Throttler> throttler) : TabletCopyClient(std::move(tablet_id), dst_fs_manager, std::move(cmeta_manager), std::move(messenger), - dst_tablet_copy_metrics) { + dst_tablet_copy_metrics, + std::move(throttler)) { } RemoteTabletCopyClient::~RemoteTabletCopyClient() { diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h index 5239477f9..38274aa55 100644 --- a/src/kudu/tserver/tablet_copy_client.h +++ b/src/kudu/tserver/tablet_copy_client.h @@ -40,6 +40,7 @@ class FsManager; class HostPort; class Schema; class ThreadPool; +class Throttler; class WritableFile; namespace consensus { @@ -143,6 +144,7 @@ class TabletCopyClient { FRIEND_TEST(TabletCopyClientBasicTest, TestDownloadWalSegment); FRIEND_TEST(TabletCopyClientBasicTest, TestDownloadAllBlocks); FRIEND_TEST(TabletCopyClientAbortTest, TestAbort); + FRIEND_TEST(TabletCopyThrottlerTest, TestThrottler); // Construct the tablet copy client. // @@ -151,7 +153,8 @@ class TabletCopyClient { FsManager* dst_fs_manager, scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager, std::shared_ptr<rpc::Messenger> messenger, - TabletCopyClientMetrics* dst_tablet_copy_metrics); + TabletCopyClientMetrics* dst_tablet_copy_metrics, + std::shared_ptr<Throttler> throttler = nullptr); // State machine that guides the progression of a single tablet copy. // A tablet copy will go through the states: @@ -314,6 +317,10 @@ class TabletCopyClient { // reading/updating rowset/wal download status. simple_spinlock simple_lock_; + // A throttler to limit tablet copy speed. + // The throttler_ is shared among multiple tablet copying tasks in the same session. + std::shared_ptr<Throttler> throttler_; + DISALLOW_COPY_AND_ASSIGN(TabletCopyClient); }; @@ -324,7 +331,8 @@ class RemoteTabletCopyClient : public TabletCopyClient { FsManager* dst_fs_manager, scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager, std::shared_ptr<rpc::Messenger> messenger, - TabletCopyClientMetrics* dst_tablet_copy_metrics); + TabletCopyClientMetrics* dst_tablet_copy_metrics, + std::shared_ptr<Throttler> throttler = nullptr); ~RemoteTabletCopyClient() override;
