This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 79812b258 auto_rebalancer: enforce max_moves_per_server per tserver
79812b258 is described below

commit 79812b258e87abf35f95177021c07b785f402bd6
Author: Gabriella Lotz <[email protected]>
AuthorDate: Fri Dec 5 18:13:02 2025 +0100

    auto_rebalancer: enforce max_moves_per_server per tserver
    
    Previously, auto_rebalancing_max_moves_per_server was enforced
    globally, allowing individual tservers to temporarily exceed the
    limit. This could disrupt workloads on those servers.
    
    Implement per-tserver move tracking with a moves_per_tserver_ map
    that is updated when moves are scheduled in ExecuteMoves() and
    decremented when they complete in CheckReplicaMovesCompleted().
    GetMovesUsingRebalancingAlgo() now checks per-tserver limits before
    scheduling moves.
    
    Also fix TestMaxMovesPerServer flakiness by using ASSERT_EVENTUALLY
    to handle race conditions during metric sampling, and verify limits
    per-tserver in addition to aggregate.
    Disabled leader rebalancing during the test to prevent interference,
    as the test only verifies replica move limits.
    
    Addresses TODO at auto_rebalancer.cc:338.
    
    Tested auto_rebalancer-test.cc by running it a 1000 times in debug
    mode on dist-test and all tests were successful.
    
    Change-Id: Ic1b73045719dc69cc9e3353d318b7381f144a20d
    Reviewed-on: http://gerrit.cloudera.org:8080/23755
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/master/auto_rebalancer-test.cc | 35 ++++++++----
 src/kudu/master/auto_rebalancer.cc      | 97 +++++++++++++++++++++++++++++++--
 src/kudu/master/auto_rebalancer.h       |  5 ++
 3 files changed, 120 insertions(+), 17 deletions(-)

diff --git a/src/kudu/master/auto_rebalancer-test.cc 
b/src/kudu/master/auto_rebalancer-test.cc
index 49bdec787..584688194 100644
--- a/src/kudu/master/auto_rebalancer-test.cc
+++ b/src/kudu/master/auto_rebalancer-test.cc
@@ -523,7 +523,8 @@ TEST_F(AutoRebalancerTest, TestMaxMovesPerServer) {
   const int kNumTablets = 12;
 
   cluster_opts_.num_tablet_servers = kNumOrigTservers;
-  ASSERT_OK(CreateAndStartCluster());
+  // Disable leader rebalancing to avoid interfering with replica rebalancing 
test.
+  ASSERT_OK(CreateAndStartCluster(/*enable_leader_rebalance=*/false));
   NO_FATALS(CheckAutoRebalancerStarted());
 
   CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
@@ -561,16 +562,28 @@ TEST_F(AutoRebalancerTest, TestMaxMovesPerServer) {
 
   // Check metric 'tablet_copy_open_client_sessions', which must be
   // less than the auto_rebalancing_max_moves_per_server, for each tserver.
-  MetricByUuid open_copy_clients_by_uuid;
-  for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
-    const auto& ts = cluster_->mini_tablet_server(i);
-    int open_client_sessions = METRIC_tablet_copy_open_client_sessions.
-        Instantiate(ts->server()->metric_entity(), 0)->value();
-    EmplaceOrDie(&open_copy_clients_by_uuid, ts->uuid(), open_client_sessions);
-  }
-  // The average number of moves per tablet server should not exceed that 
specified.
-  ASSERT_GE(FLAGS_auto_rebalancing_max_moves_per_server * 
cluster_->num_tablet_servers(),
-      AggregateMetricCounts(open_copy_clients_by_uuid, 0, 
cluster_->num_tablet_servers()));
+  // Use ASSERT_EVENTUALLY to handle timing issues: the auto-rebalancer
+  // schedules moves asynchronously, so we need to retry the check to allow
+  // for momentary violations during scheduling.
+  ASSERT_EVENTUALLY([&] {
+    MetricByUuid open_copy_clients_by_uuid;
+    for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
+      const auto& ts = cluster_->mini_tablet_server(i);
+      int open_client_sessions =
+          
METRIC_tablet_copy_open_client_sessions.Instantiate(ts->server()->metric_entity(),
 0)
+              ->value();
+      EmplaceOrDie(&open_copy_clients_by_uuid, ts->uuid(), 
open_client_sessions);
+
+      ASSERT_LE(open_client_sessions, 
FLAGS_auto_rebalancing_max_moves_per_server)
+          << "Tserver " << ts->uuid() << " exceeded max moves per server";
+    }
+
+    int total_moves =
+        AggregateMetricCounts(open_copy_clients_by_uuid, 0, 
cluster_->num_tablet_servers());
+    int max_total = FLAGS_auto_rebalancing_max_moves_per_server * 
cluster_->num_tablet_servers();
+    ASSERT_LE(total_moves, max_total)
+        << "Total moves " << total_moves << " exceeds max " << max_total;
+  });
 
   NO_FATALS(CheckNoLeaderMovesScheduled());
 }
diff --git a/src/kudu/master/auto_rebalancer.cc 
b/src/kudu/master/auto_rebalancer.cc
index 39807905b..62516b474 100644
--- a/src/kudu/master/auto_rebalancer.cc
+++ b/src/kudu/master/auto_rebalancer.cc
@@ -17,6 +17,8 @@
 
 #include "kudu/master/auto_rebalancer.h"
 
+#include <cstdint>
+
 #include <atomic>
 #include <functional>
 #include <memory>
@@ -263,6 +265,14 @@ void AutoRebalancerTask::RunLoop() {
       WARN_NOT_OK(CheckReplicaMovesCompleted(&replica_moves),
                   "scheduled replica move failed to complete");
     } while (!replica_moves.empty());
+
+    // Verify that all move counters are properly cleaned up.
+#if DCHECK_IS_ON()
+    for (const auto& entry : moves_per_tserver_) {
+      DCHECK_EQ(0, entry.second) << "Tserver " << entry.first << " still has " 
<< entry.second
+                                 << " moves after all operations completed";
+    }
+#endif
   }
 }
 
@@ -331,12 +341,15 @@ Status AutoRebalancerTask::GetMovesUsingRebalancingAlgo(
   rebalance::RebalancingAlgo* algo,
   CrossLocations cross_location,
   vector<Rebalancer::ReplicaMove>* replica_moves) {
-
-  auto num_tservers = raw_info.tserver_summaries.size();
-  auto max_moves = FLAGS_auto_rebalancing_max_moves_per_server * num_tservers;
-  max_moves -= replica_moves->size();
-  // TODO(awong): it'd be nice to track the number of on-going moves for each
-  // tablet server and enforce the max moves at a more granular level.
+  // Capture the flag value once to ensure consistency throughout this method
+  // and protect against in-flight changes via 'kudu master set_flag'.
+  const int max_moves_per_server = FLAGS_auto_rebalancing_max_moves_per_server;
+
+  // Use signed integers to handle the case where replica_moves->size() might 
exceed
+  // the calculated limit, which would cause underflow with unsigned types.
+  const int64_t num_tservers = raw_info.tserver_summaries.size();
+  int64_t max_moves = max_moves_per_server * num_tservers;
+  max_moves -= static_cast<int64_t>(replica_moves->size());
   if (max_moves <= 0) {
     return Status::OK();
   }
@@ -357,7 +370,28 @@ Status AutoRebalancerTask::GetMovesUsingRebalancingAlgo(
 
   unordered_set<string> tablets_in_move;
   vector<Rebalancer::ReplicaMove> rep_moves;
+
   for (const auto& move : moves) {
+    // Check if this move would exceed the per-tserver limit based on currently
+    // in-flight moves. We check against moves_per_tserver_ (the actual 
ongoing moves)
+    // rather than limiting within this batch, since the global max_moves limit
+    // already constrains the batch size.
+    int src_ongoing = moves_per_tserver_[move.from];
+    int dst_ongoing = moves_per_tserver_[move.to];
+
+    if (src_ongoing >= max_moves_per_server || dst_ongoing >= 
max_moves_per_server) {
+      // Skip this move as it would violate per-tserver limits.
+      VLOG(1) << Substitute(
+          "Skipping move from $0 to $1: per-tserver limit reached "
+          "(src=$2, dst=$3, limit=$4)",
+          move.from,
+          move.to,
+          src_ongoing,
+          dst_ongoing,
+          max_moves_per_server);
+      continue;
+    }
+
     vector<string> tablet_ids;
     rebalancer_.FindReplicas(move, raw_info, &tablet_ids);
     if (cross_location == CrossLocations::YES) {
@@ -462,6 +496,21 @@ Status AutoRebalancerTask::ExecuteMoves(
     ConsensusServiceProxy proxy(messenger_, resolved[0], leader_hp.host());
     RETURN_NOT_OK(proxy.BulkChangeConfig(req, &resp, &rpc));
     if (resp.has_error()) return StatusFromPB(resp.error().status());
+
+    // Successfully scheduled the move. Increment counters for both source and 
destination.
+    moves_per_tserver_[src_ts_uuid]++;
+    if (!dst_ts_uuid.empty()) {
+      moves_per_tserver_[dst_ts_uuid]++;
+    }
+
+    VLOG(1) << Substitute(
+        "Scheduled move: tablet $0 from $1 to $2 "
+        "(src_moves=$3, dst_moves=$4)",
+        tablet_id,
+        src_ts_uuid,
+        dst_ts_uuid,
+        moves_per_tserver_[src_ts_uuid],
+        dst_ts_uuid.empty() ? 0 : moves_per_tserver_[dst_ts_uuid]);
   }
   return Status::OK();
 }
@@ -659,6 +708,20 @@ Status AutoRebalancerTask::CheckReplicaMovesCompleted(
     // the problematic one from 'replica_moves'.
     Status s = CheckMoveCompleted(move, &move_is_complete);
     if (!s.ok()) {
+      // Move failed. Decrement the per-tserver counters.
+      const auto& src_ts_uuid = move.ts_uuid_from;
+      const auto& dst_ts_uuid = move.ts_uuid_to;
+
+      // The counter may be zero if ExecuteMoves() failed before reaching this 
move.
+      // ExecuteMoves() uses RETURN_NOT_OK, which exits early on error, so 
later
+      // moves in replica_moves never had their counters incremented.
+      if (moves_per_tserver_[src_ts_uuid] > 0) {
+        moves_per_tserver_[src_ts_uuid]--;
+      }
+      if (!dst_ts_uuid.empty() && moves_per_tserver_[dst_ts_uuid] > 0) {
+        moves_per_tserver_[dst_ts_uuid]--;
+      }
+
       replica_moves->erase(replica_moves->begin() + i);
       LOG(WARNING) << Substitute("Could not move replica: $0", s.ToString());
       return s;
@@ -669,8 +732,30 @@ Status AutoRebalancerTask::CheckReplicaMovesCompleted(
     }
   }
 
+  // For all completed moves, decrement the per-tserver counters and remove 
from list.
   int num_indexes = static_cast<int>(indexes_to_remove.size());
   for (int j = num_indexes - 1; j >= 0; --j) {
+    const auto& move = (*replica_moves)[indexes_to_remove[j]];
+    const auto& src_ts_uuid = move.ts_uuid_from;
+    const auto& dst_ts_uuid = move.ts_uuid_to;
+
+    // The counter may be zero if ExecuteMoves() failed before reaching this 
move.
+    if (moves_per_tserver_[src_ts_uuid] > 0) {
+      moves_per_tserver_[src_ts_uuid]--;
+    }
+    if (!dst_ts_uuid.empty() && moves_per_tserver_[dst_ts_uuid] > 0) {
+      moves_per_tserver_[dst_ts_uuid]--;
+    }
+
+    VLOG(1) << Substitute(
+        "Move completed: tablet $0 from $1 to $2 "
+        "(src_moves=$3, dst_moves=$4)",
+        move.tablet_uuid,
+        src_ts_uuid,
+        dst_ts_uuid,
+        moves_per_tserver_[src_ts_uuid],
+        dst_ts_uuid.empty() ? 0 : moves_per_tserver_[dst_ts_uuid]);
+
     replica_moves->erase(replica_moves->begin() + indexes_to_remove[j]);
   }
 
diff --git a/src/kudu/master/auto_rebalancer.h 
b/src/kudu/master/auto_rebalancer.h
index ecd7d3ca2..64cb5562f 100644
--- a/src/kudu/master/auto_rebalancer.h
+++ b/src/kudu/master/auto_rebalancer.h
@@ -22,6 +22,7 @@
 #include <optional>
 #include <random>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "kudu/gutil/port.h"
@@ -186,6 +187,10 @@ class AutoRebalancerTask {
   std::random_device random_device_;
   std::mt19937 random_generator_;
 
+  // Track the number of ongoing moves per tablet server (both source and 
destination).
+  // Key is the tserver UUID, value is the count of ongoing moves.
+  std::unordered_map<std::string, int> moves_per_tserver_;
+
   // Variables for testing.
   std::atomic<int> number_of_loop_iterations_for_test_;
   std::atomic<int> moves_scheduled_this_round_for_test_;

Reply via email to