This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 274eadfd7 [util] a small clean up on the Throttler class
274eadfd7 is described below
commit 274eadfd79cb2ffcaa0c016376469d70036e11ca
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jul 19 15:38:57 2024 -0700
[util] a small clean up on the Throttler class
The motivation behind this patch was an idea of making the API of the
Throttler class more robust after reviewing a changelist that used the
functionality of the Throttler. I have doubts that the current
implementation of the Throttler behaves the way one would expect
(there is just spotty test coverage for that), but it's another story.
Change-Id: I43d60323c3d84da896c1a5429dfb7d461a24f9b2
Reviewed-on: http://gerrit.cloudera.org:8080/21603
Reviewed-by: Yingchun Lai <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
src/kudu/tablet/tablet.cc | 5 ++-
src/kudu/tools/table_scanner.cc | 6 ++--
src/kudu/tools/tool_action_local_replica.cc | 3 +-
src/kudu/tserver/tablet_copy_client-test.cc | 1 -
src/kudu/tserver/tablet_copy_client.cc | 4 +--
src/kudu/util/throttler-test.cc | 22 ++++++++----
src/kudu/util/throttler.cc | 48 ++++++++++++++++---------
src/kudu/util/throttler.h | 55 ++++++++++++++++++-----------
8 files changed, 89 insertions(+), 55 deletions(-)
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index d6d811d6a..909d5df0e 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -386,8 +386,7 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata,
FLAGS_tablet_compaction_budget_mb, metrics_.get()));
if (FLAGS_tablet_throttler_rpc_per_sec > 0 ||
FLAGS_tablet_throttler_bytes_per_sec > 0) {
- throttler_.reset(new Throttler(MonoTime::Now(),
- FLAGS_tablet_throttler_rpc_per_sec,
+ throttler_.reset(new Throttler(FLAGS_tablet_throttler_rpc_per_sec,
FLAGS_tablet_throttler_bytes_per_sec,
FLAGS_tablet_throttler_burst_factor));
}
@@ -1795,7 +1794,7 @@ bool Tablet::ShouldThrottleAllow(int64_t bytes) {
if (!throttler_) {
return true;
}
- return throttler_->Take(MonoTime::Now(), 1, bytes);
+ return throttler_->Take(1, bytes);
}
Status Tablet::PickRowSetsToCompact(RowSetsInCompactionOrFlush *picked,
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index eebb94a41..fac0ce949 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -582,7 +582,7 @@ TableScanner::TableScanner(
out_(nullptr) {
CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
if (FLAGS_table_copy_throttler_bytes_per_sec > 0) {
- throttler_ = std::make_shared<Throttler>(MonoTime::Now(), 0,
+ throttler_ = std::make_shared<Throttler>(Throttler::kNoLimit,
FLAGS_table_copy_throttler_bytes_per_sec,
FLAGS_table_copy_throttler_burst_factor);
}
@@ -611,9 +611,9 @@ Status TableScanner::ScanData(const vector<KuduScanToken*>&
tokens,
// Limit table copy speed.
if (throttler_) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, 1000, "Table copy throttler");
- while (!throttler_->Take(MonoTime::Now(), 0,
+ while (!throttler_->Take(0,
batch.direct_data().size() +
batch.indirect_data().size())) {
- SleepFor(MonoDelta::FromMilliseconds(10));
+ SleepFor(MonoDelta::FromMicroseconds(Throttler::kRefillPeriodMicros
/ 2));
}
}
RETURN_NOT_OK(cb(batch));
diff --git a/src/kudu/tools/tool_action_local_replica.cc
b/src/kudu/tools/tool_action_local_replica.cc
index 2449db455..5c15d6bbd 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -334,8 +334,7 @@ class TabletCopier {
shared_ptr<Throttler> throttler;
if (FLAGS_tablet_copy_throttler_bytes_per_sec > 0) {
- throttler = std::make_shared<Throttler>(MonoTime::Now(),
- 0,
+ throttler = std::make_shared<Throttler>(0,
FLAGS_tablet_copy_throttler_bytes_per_sec,
FLAGS_tablet_copy_throttler_burst_factor);
}
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc
b/src/kudu/tserver/tablet_copy_client-test.cc
index a53f2aa31..fadaca536 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -319,7 +319,6 @@ class TabletCopyThrottlerTest : public TabletCopyClientTest
{
TabletCopyThrottlerTest() {
mode_ = TabletCopyMode::REMOTE;
throttler_ = std::make_shared<Throttler>(
- MonoTime::Now(),
0,
FLAGS_tablet_copy_transfer_chunk_size_bytes,
2 * FLAGS_tablet_copy_transfer_chunk_size_bytes);
diff --git a/src/kudu/tserver/tablet_copy_client.cc
b/src/kudu/tserver/tablet_copy_client.cc
index d392c72c2..85772f813 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -1036,8 +1036,8 @@ Status RemoteTabletCopyClient::DownloadFile(const
DataIdPB& data_id,
}
if (throttler_) {
LOG_TIMING(INFO, "Tablet copy throttler") {
- while (!throttler_->Take(MonoTime::Now(), 0, chunk_size)) {
- SleepFor(MonoDelta::FromMilliseconds(10));
+ while (!throttler_->Take(0, chunk_size)) {
+ SleepFor(MonoDelta::FromMicroseconds(Throttler::kRefillPeriodMicros
/ 2));
}
}
}
diff --git a/src/kudu/util/throttler-test.cc b/src/kudu/util/throttler-test.cc
index ff97eb5da..3c391949b 100644
--- a/src/kudu/util/throttler-test.cc
+++ b/src/kudu/util/throttler-test.cc
@@ -17,6 +17,8 @@
#include "kudu/util/throttler.h"
+#include <string>
+
#include <gtest/gtest.h>
#include "kudu/util/monotime.h"
@@ -27,7 +29,13 @@ namespace kudu {
class ThrottlerTest : public KuduTest {
};
-TEST_F(ThrottlerTest, TestOpThrottle) {
+TEST_F(ThrottlerTest, Basic) {
+ Throttler t(1, 1, 1.0);
+ ASSERT_TRUE(t.Take(0, 1));
+ ASSERT_TRUE(t.Take(1, 0));
+}
+
+TEST_F(ThrottlerTest, OpThrottle) {
// Check operation rate throttling
MonoTime now = MonoTime::Now();
Throttler t0(now, 1000, 1000*1000, 1);
@@ -39,12 +47,12 @@ TEST_F(ThrottlerTest, TestOpThrottle) {
ASSERT_TRUE(t0.Take(now, 1, 1));
}
ASSERT_FALSE(t0.Take(now, 1, 1));
- now += MonoDelta::FromMilliseconds(100);
+ now += MonoDelta::FromMicroseconds(Throttler::kRefillPeriodMicros);
}
}
-TEST_F(ThrottlerTest, TestIOThrottle) {
- // Check operation rate throttling
+TEST_F(ThrottlerTest, IOThrottle) {
+ // Check IO rate throttling
MonoTime now = MonoTime::Now();
Throttler t0(now, 50000, 1000*1000, 1);
// Fill up bucket
@@ -55,12 +63,12 @@ TEST_F(ThrottlerTest, TestIOThrottle) {
ASSERT_TRUE(t0.Take(now, 1, 1000));
}
ASSERT_FALSE(t0.Take(now, 1, 1000));
- now += MonoDelta::FromMilliseconds(100);
+ now += MonoDelta::FromMilliseconds(Throttler::kRefillPeriodMicros);
}
}
-TEST_F(ThrottlerTest, TestBurst) {
- // Check IO rate throttling
+TEST_F(ThrottlerTest, Burst) {
+ // Check throttling for bursty consuming
MonoTime now = MonoTime::Now();
Throttler t0(now, 2000, 1000*1000, 5);
// Fill up bucket
diff --git a/src/kudu/util/throttler.cc b/src/kudu/util/throttler.cc
index 69e0f99dc..f4b35428c 100644
--- a/src/kudu/util/throttler.cc
+++ b/src/kudu/util/throttler.cc
@@ -18,33 +18,49 @@
#include "kudu/util/throttler.h"
#include <algorithm>
-#include <mutex>
+
+#include <glog/logging.h>
namespace kudu {
-Throttler::Throttler(MonoTime now, uint64_t op_rate, uint64_t byte_rate,
double burst_factor) :
- next_refill_(now) {
- op_refill_ = op_rate / (MonoTime::kMicrosecondsPerSecond /
kRefillPeriodMicros);
- op_token_ = 0;
- op_token_max_ = static_cast<uint64_t>(op_refill_ * burst_factor);
- byte_refill_ = byte_rate / (MonoTime::kMicrosecondsPerSecond /
kRefillPeriodMicros);
- byte_token_ = 0;
- byte_token_max_ = static_cast<uint64_t>(byte_refill_ * burst_factor);
+Throttler::Throttler(uint64_t op_rate_per_sec,
+ uint64_t byte_rate_per_sec,
+ double burst_factor)
+ : Throttler(MonoTime::Now(), op_rate_per_sec, byte_rate_per_sec,
burst_factor) {
+}
+
+Throttler::Throttler(MonoTime now,
+ uint64_t op_rate_per_sec,
+ uint64_t byte_rate_per_sec,
+ double burst_factor)
+ : byte_refill_(byte_rate_per_sec / (MonoTime::kMicrosecondsPerSecond /
kRefillPeriodMicros)),
+ byte_token_max_(static_cast<uint64_t>(byte_refill_ * burst_factor)),
+ op_refill_(op_rate_per_sec / (MonoTime::kMicrosecondsPerSecond /
kRefillPeriodMicros)),
+ op_token_max_(static_cast<uint64_t>(op_refill_ * burst_factor)),
+ byte_token_(0),
+ op_token_(0),
+ next_refill_(now) {
}
-bool Throttler::Take(MonoTime now, uint64_t op, uint64_t byte) {
- if (op_refill_ == 0 && byte_refill_ == 0) {
+bool Throttler::Take(uint64_t ops, uint64_t bytes) {
+ return Take(MonoTime::Now(), ops, bytes);
+}
+
+bool Throttler::Take(MonoTime now, uint64_t ops, uint64_t bytes) {
+ DCHECK(ops > 0 || bytes > 0);
+ if (op_refill_ == kNoLimit && byte_refill_ == kNoLimit) {
return true;
}
- std::lock_guard<simple_spinlock> lock(lock_);
+
+ std::lock_guard lock(lock_);
Refill(now);
- if ((op_refill_ == 0 || op <= op_token_) &&
- (byte_refill_ == 0 || byte <= byte_token_)) {
+ if ((op_refill_ == 0 || ops <= op_token_) &&
+ (byte_refill_ == 0 || bytes <= byte_token_)) {
if (op_refill_ > 0) {
- op_token_ -= op;
+ op_token_ -= ops;
}
if (byte_refill_ > 0) {
- byte_token_ -= byte;
+ byte_token_ -= bytes;
}
return true;
}
diff --git a/src/kudu/util/throttler.h b/src/kudu/util/throttler.h
index 559409120..191408077 100644
--- a/src/kudu/util/throttler.h
+++ b/src/kudu/util/throttler.h
@@ -14,49 +14,62 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_UTIL_THROTTLER_H
-#define KUDU_UTIL_THROTTLER_H
+#pragma once
#include <cstdint>
+#include <gtest/gtest_prod.h>
+
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
namespace kudu {
// A throttler to throttle both operation/s and IO byte/s.
-class Throttler {
+class Throttler final {
public:
// Refill period is 100ms.
- enum {
- kRefillPeriodMicros = 100000
- };
+ static constexpr const int64_t kRefillPeriodMicros = 100000;
+ static constexpr const uint64_t kNoLimit = 0;
// Construct a throttler with max operation per second, max IO bytes per
second
- // and burst factor (burst_rate = rate * burst), burst rate means maximum
- // throughput within one refill period.
- // Set op_per_sec to 0 to disable operation throttling.
- // Set byte_per_sec to 0 to disable IO bytes throttling.
- Throttler(MonoTime now, uint64_t op_per_sec, uint64_t byte_per_sec, double
burst_factor);
+ // and burst factor (burst_rate = rate * burst_factor), burst_rate means
+ // the maximum throughput within one refill period.
+ // Set op_rate_per_sec to kNoLimit to disable operation rate throttling.
+ // Set byte_rate_per_sec to kNoLimit to disable IO rate throttling.
+ Throttler(uint64_t op_rate_per_sec,
+ uint64_t byte_rate_per_sec,
+ double burst_factor);
- // Throttle an "operation group" by taking 'op' operation tokens and 'byte'
byte tokens.
+ // Throttle an "operation group" by taking 'ops' operation tokens and 'bytes'
+ // byte tokens.
// Return true if there are enough tokens, and operation is allowed.
// Return false if there are not enough tokens, and operation is throttled.
- bool Take(MonoTime now, uint64_t op, uint64_t byte);
+ bool Take(uint64_t ops, uint64_t bytes);
private:
+ FRIEND_TEST(ThrottlerTest, OpThrottle);
+ FRIEND_TEST(ThrottlerTest, IOThrottle);
+ FRIEND_TEST(ThrottlerTest, Burst);
+
+ Throttler(MonoTime now,
+ uint64_t op_rate_per_sec,
+ uint64_t byte_rate_per_sec,
+ double burst_factor);
+
+ bool Take(MonoTime now, uint64_t ops, uint64_t bytes);
+
void Refill(MonoTime now);
- MonoTime next_refill_;
- uint64_t op_refill_;
- uint64_t op_token_;
- uint64_t op_token_max_;
- uint64_t byte_refill_;
+ const uint64_t byte_refill_;
+ const uint64_t byte_token_max_;
+ const uint64_t op_refill_;
+ const uint64_t op_token_max_;
+
uint64_t byte_token_;
- uint64_t byte_token_max_;
+ uint64_t op_token_;
+ MonoTime next_refill_;
simple_spinlock lock_;
};
} // namespace kudu
-
-#endif