This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 79812b258 auto_rebalancer: enforce max_moves_per_server per tserver
79812b258 is described below
commit 79812b258e87abf35f95177021c07b785f402bd6
Author: Gabriella Lotz <[email protected]>
AuthorDate: Fri Dec 5 18:13:02 2025 +0100
auto_rebalancer: enforce max_moves_per_server per tserver
Previously, auto_rebalancing_max_moves_per_server was enforced
globally, allowing individual tservers to temporarily exceed the
limit. This could disrupt workloads on those servers.
Implement per-tserver move tracking with a moves_per_tserver_ map
that is updated when moves are scheduled in ExecuteMoves() and
decremented when they complete in CheckReplicaMovesCompleted().
GetMovesUsingRebalancingAlgo() now checks per-tserver limits before
scheduling moves.
Also fix TestMaxMovesPerServer flakiness by using ASSERT_EVENTUALLY
to handle race conditions during metric sampling, and verify limits
per-tserver in addition to aggregate.
Disabled leader rebalancing during the test to prevent interference,
as the test only verifies replica move limits.
Addresses TODO at auto_rebalancer.cc:338.
Tested auto_rebalancer-test.cc by running it a 1000 times in debug
mode on dist-test and all tests were successful.
Change-Id: Ic1b73045719dc69cc9e3353d318b7381f144a20d
Reviewed-on: http://gerrit.cloudera.org:8080/23755
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/master/auto_rebalancer-test.cc | 35 ++++++++----
src/kudu/master/auto_rebalancer.cc | 97 +++++++++++++++++++++++++++++++--
src/kudu/master/auto_rebalancer.h | 5 ++
3 files changed, 120 insertions(+), 17 deletions(-)
diff --git a/src/kudu/master/auto_rebalancer-test.cc
b/src/kudu/master/auto_rebalancer-test.cc
index 49bdec787..584688194 100644
--- a/src/kudu/master/auto_rebalancer-test.cc
+++ b/src/kudu/master/auto_rebalancer-test.cc
@@ -523,7 +523,8 @@ TEST_F(AutoRebalancerTest, TestMaxMovesPerServer) {
const int kNumTablets = 12;
cluster_opts_.num_tablet_servers = kNumOrigTservers;
- ASSERT_OK(CreateAndStartCluster());
+ // Disable leader rebalancing to avoid interfering with replica rebalancing
test.
+ ASSERT_OK(CreateAndStartCluster(/*enable_leader_rebalance=*/false));
NO_FATALS(CheckAutoRebalancerStarted());
CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
@@ -561,16 +562,28 @@ TEST_F(AutoRebalancerTest, TestMaxMovesPerServer) {
// Check metric 'tablet_copy_open_client_sessions', which must be
// less than the auto_rebalancing_max_moves_per_server, for each tserver.
- MetricByUuid open_copy_clients_by_uuid;
- for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
- const auto& ts = cluster_->mini_tablet_server(i);
- int open_client_sessions = METRIC_tablet_copy_open_client_sessions.
- Instantiate(ts->server()->metric_entity(), 0)->value();
- EmplaceOrDie(&open_copy_clients_by_uuid, ts->uuid(), open_client_sessions);
- }
- // The average number of moves per tablet server should not exceed that
specified.
- ASSERT_GE(FLAGS_auto_rebalancing_max_moves_per_server *
cluster_->num_tablet_servers(),
- AggregateMetricCounts(open_copy_clients_by_uuid, 0,
cluster_->num_tablet_servers()));
+ // Use ASSERT_EVENTUALLY to handle timing issues: the auto-rebalancer
+ // schedules moves asynchronously, so we need to retry the check to allow
+ // for momentary violations during scheduling.
+ ASSERT_EVENTUALLY([&] {
+ MetricByUuid open_copy_clients_by_uuid;
+ for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
+ const auto& ts = cluster_->mini_tablet_server(i);
+ int open_client_sessions =
+
METRIC_tablet_copy_open_client_sessions.Instantiate(ts->server()->metric_entity(),
0)
+ ->value();
+ EmplaceOrDie(&open_copy_clients_by_uuid, ts->uuid(),
open_client_sessions);
+
+ ASSERT_LE(open_client_sessions,
FLAGS_auto_rebalancing_max_moves_per_server)
+ << "Tserver " << ts->uuid() << " exceeded max moves per server";
+ }
+
+ int total_moves =
+ AggregateMetricCounts(open_copy_clients_by_uuid, 0,
cluster_->num_tablet_servers());
+ int max_total = FLAGS_auto_rebalancing_max_moves_per_server *
cluster_->num_tablet_servers();
+ ASSERT_LE(total_moves, max_total)
+ << "Total moves " << total_moves << " exceeds max " << max_total;
+ });
NO_FATALS(CheckNoLeaderMovesScheduled());
}
diff --git a/src/kudu/master/auto_rebalancer.cc
b/src/kudu/master/auto_rebalancer.cc
index 39807905b..62516b474 100644
--- a/src/kudu/master/auto_rebalancer.cc
+++ b/src/kudu/master/auto_rebalancer.cc
@@ -17,6 +17,8 @@
#include "kudu/master/auto_rebalancer.h"
+#include <cstdint>
+
#include <atomic>
#include <functional>
#include <memory>
@@ -263,6 +265,14 @@ void AutoRebalancerTask::RunLoop() {
WARN_NOT_OK(CheckReplicaMovesCompleted(&replica_moves),
"scheduled replica move failed to complete");
} while (!replica_moves.empty());
+
+ // Verify that all move counters are properly cleaned up.
+#if DCHECK_IS_ON()
+ for (const auto& entry : moves_per_tserver_) {
+ DCHECK_EQ(0, entry.second) << "Tserver " << entry.first << " still has "
<< entry.second
+ << " moves after all operations completed";
+ }
+#endif
}
}
@@ -331,12 +341,15 @@ Status AutoRebalancerTask::GetMovesUsingRebalancingAlgo(
rebalance::RebalancingAlgo* algo,
CrossLocations cross_location,
vector<Rebalancer::ReplicaMove>* replica_moves) {
-
- auto num_tservers = raw_info.tserver_summaries.size();
- auto max_moves = FLAGS_auto_rebalancing_max_moves_per_server * num_tservers;
- max_moves -= replica_moves->size();
- // TODO(awong): it'd be nice to track the number of on-going moves for each
- // tablet server and enforce the max moves at a more granular level.
+ // Capture the flag value once to ensure consistency throughout this method
+ // and protect against in-flight changes via 'kudu master set_flag'.
+ const int max_moves_per_server = FLAGS_auto_rebalancing_max_moves_per_server;
+
+ // Use signed integers to handle the case where replica_moves->size() might
exceed
+ // the calculated limit, which would cause underflow with unsigned types.
+ const int64_t num_tservers = raw_info.tserver_summaries.size();
+ int64_t max_moves = max_moves_per_server * num_tservers;
+ max_moves -= static_cast<int64_t>(replica_moves->size());
if (max_moves <= 0) {
return Status::OK();
}
@@ -357,7 +370,28 @@ Status AutoRebalancerTask::GetMovesUsingRebalancingAlgo(
unordered_set<string> tablets_in_move;
vector<Rebalancer::ReplicaMove> rep_moves;
+
for (const auto& move : moves) {
+ // Check if this move would exceed the per-tserver limit based on currently
+ // in-flight moves. We check against moves_per_tserver_ (the actual
ongoing moves)
+ // rather than limiting within this batch, since the global max_moves limit
+ // already constrains the batch size.
+ int src_ongoing = moves_per_tserver_[move.from];
+ int dst_ongoing = moves_per_tserver_[move.to];
+
+ if (src_ongoing >= max_moves_per_server || dst_ongoing >=
max_moves_per_server) {
+ // Skip this move as it would violate per-tserver limits.
+ VLOG(1) << Substitute(
+ "Skipping move from $0 to $1: per-tserver limit reached "
+ "(src=$2, dst=$3, limit=$4)",
+ move.from,
+ move.to,
+ src_ongoing,
+ dst_ongoing,
+ max_moves_per_server);
+ continue;
+ }
+
vector<string> tablet_ids;
rebalancer_.FindReplicas(move, raw_info, &tablet_ids);
if (cross_location == CrossLocations::YES) {
@@ -462,6 +496,21 @@ Status AutoRebalancerTask::ExecuteMoves(
ConsensusServiceProxy proxy(messenger_, resolved[0], leader_hp.host());
RETURN_NOT_OK(proxy.BulkChangeConfig(req, &resp, &rpc));
if (resp.has_error()) return StatusFromPB(resp.error().status());
+
+ // Successfully scheduled the move. Increment counters for both source and
destination.
+ moves_per_tserver_[src_ts_uuid]++;
+ if (!dst_ts_uuid.empty()) {
+ moves_per_tserver_[dst_ts_uuid]++;
+ }
+
+ VLOG(1) << Substitute(
+ "Scheduled move: tablet $0 from $1 to $2 "
+ "(src_moves=$3, dst_moves=$4)",
+ tablet_id,
+ src_ts_uuid,
+ dst_ts_uuid,
+ moves_per_tserver_[src_ts_uuid],
+ dst_ts_uuid.empty() ? 0 : moves_per_tserver_[dst_ts_uuid]);
}
return Status::OK();
}
@@ -659,6 +708,20 @@ Status AutoRebalancerTask::CheckReplicaMovesCompleted(
// the problematic one from 'replica_moves'.
Status s = CheckMoveCompleted(move, &move_is_complete);
if (!s.ok()) {
+ // Move failed. Decrement the per-tserver counters.
+ const auto& src_ts_uuid = move.ts_uuid_from;
+ const auto& dst_ts_uuid = move.ts_uuid_to;
+
+ // The counter may be zero if ExecuteMoves() failed before reaching this
move.
+ // ExecuteMoves() uses RETURN_NOT_OK, which exits early on error, so
later
+ // moves in replica_moves never had their counters incremented.
+ if (moves_per_tserver_[src_ts_uuid] > 0) {
+ moves_per_tserver_[src_ts_uuid]--;
+ }
+ if (!dst_ts_uuid.empty() && moves_per_tserver_[dst_ts_uuid] > 0) {
+ moves_per_tserver_[dst_ts_uuid]--;
+ }
+
replica_moves->erase(replica_moves->begin() + i);
LOG(WARNING) << Substitute("Could not move replica: $0", s.ToString());
return s;
@@ -669,8 +732,30 @@ Status AutoRebalancerTask::CheckReplicaMovesCompleted(
}
}
+ // For all completed moves, decrement the per-tserver counters and remove
from list.
int num_indexes = static_cast<int>(indexes_to_remove.size());
for (int j = num_indexes - 1; j >= 0; --j) {
+ const auto& move = (*replica_moves)[indexes_to_remove[j]];
+ const auto& src_ts_uuid = move.ts_uuid_from;
+ const auto& dst_ts_uuid = move.ts_uuid_to;
+
+ // The counter may be zero if ExecuteMoves() failed before reaching this
move.
+ if (moves_per_tserver_[src_ts_uuid] > 0) {
+ moves_per_tserver_[src_ts_uuid]--;
+ }
+ if (!dst_ts_uuid.empty() && moves_per_tserver_[dst_ts_uuid] > 0) {
+ moves_per_tserver_[dst_ts_uuid]--;
+ }
+
+ VLOG(1) << Substitute(
+ "Move completed: tablet $0 from $1 to $2 "
+ "(src_moves=$3, dst_moves=$4)",
+ move.tablet_uuid,
+ src_ts_uuid,
+ dst_ts_uuid,
+ moves_per_tserver_[src_ts_uuid],
+ dst_ts_uuid.empty() ? 0 : moves_per_tserver_[dst_ts_uuid]);
+
replica_moves->erase(replica_moves->begin() + indexes_to_remove[j]);
}
diff --git a/src/kudu/master/auto_rebalancer.h
b/src/kudu/master/auto_rebalancer.h
index ecd7d3ca2..64cb5562f 100644
--- a/src/kudu/master/auto_rebalancer.h
+++ b/src/kudu/master/auto_rebalancer.h
@@ -22,6 +22,7 @@
#include <optional>
#include <random>
#include <string>
+#include <unordered_map>
#include <vector>
#include "kudu/gutil/port.h"
@@ -186,6 +187,10 @@ class AutoRebalancerTask {
std::random_device random_device_;
std::mt19937 random_generator_;
+ // Track the number of ongoing moves per tablet server (both source and
destination).
+ // Key is the tserver UUID, value is the count of ongoing moves.
+ std::unordered_map<std::string, int> moves_per_tserver_;
+
// Variables for testing.
std::atomic<int> number_of_loop_iterations_for_test_;
std::atomic<int> moves_scheduled_this_round_for_test_;