This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 775a130 auto_rebalancer: ignore deleted tables
775a130 is described below
commit 775a130963a9a7915b6be9fbd727274d152caed4
Author: Andrew Wong <[email protected]>
AuthorDate: Wed May 20 10:50:36 2020 -0700
auto_rebalancer: ignore deleted tables
The auto-rebalancer would previously not do any rebalancing if any of
the tables in the cluster were deleted, since the tablets of these
tables would not be RUNNING. This updates the auto-rebalancer to ignore
such tables.
Change-Id: Ib5499b09c677dd6c1016349398952a1c39820691
Reviewed-on: http://gerrit.cloudera.org:8080/15970
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/master/auto_rebalancer-test.cc | 113 ++++++++++++++++++++++++++++----
src/kudu/master/auto_rebalancer.cc | 6 +-
2 files changed, 104 insertions(+), 15 deletions(-)
diff --git a/src/kudu/master/auto_rebalancer-test.cc
b/src/kudu/master/auto_rebalancer-test.cc
index cd3ce57..456c840 100644
--- a/src/kudu/master/auto_rebalancer-test.cc
+++ b/src/kudu/master/auto_rebalancer-test.cc
@@ -22,12 +22,15 @@
#include <set>
#include <string>
#include <unordered_map>
+#include <unordered_set>
#include <vector>
+#include <boost/optional/optional.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/client/client.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
@@ -52,10 +55,14 @@
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
+using kudu::itest::GetTableLocations;
+using kudu::itest::ListTabletServers;
+using kudu::master::GetTableLocationsResponsePB;
using std::set;
using std::string;
using std::unique_ptr;
using std::unordered_map;
+using std::unordered_set;
using std::vector;
using strings::Substitute;
@@ -228,7 +235,7 @@ class AutoRebalancerTest : public KuduTest {
auto_rebalancer()->moves_scheduled_this_round_for_test_;
}
- void SetupWorkLoad(int num_tablets, int num_replicas) {
+ void CreateWorkloadTable(int num_tablets, int num_replicas) {
workload_.reset(new TestWorkload(cluster_.get()));
workload_->set_num_tablets(num_tablets);
workload_->set_num_replicas(num_replicas);
@@ -258,7 +265,7 @@ TEST_F(AutoRebalancerTest, OnlyLeaderDoesAutoRebalancing) {
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
- SetupWorkLoad(kNumTablets, /*num_replicas*/1);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/1);
int leader_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
@@ -286,7 +293,7 @@ TEST_F(AutoRebalancerTest,
NextLeaderResumesAutoRebalancing) {
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
- SetupWorkLoad(kNumTablets, /*num_replicas*/1);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/1);
// Verify that non-leaders are not performing rebalancing,
// then take down the leader master.
@@ -320,7 +327,7 @@ TEST_F(AutoRebalancerTest, MovesScheduledIfAddTserver) {
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
- SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
NO_FATALS(CheckNoMovesScheduled());
// Take a snapshot of the number of copy sources started on the original
@@ -379,9 +386,9 @@ TEST_F(AutoRebalancerTest,
NoReplicaMovesIfLocationLoadSkewedByOne) {
const auto timeout = MonoDelta::FromSeconds(30);
vector<master::ListTabletServersResponsePB_Entry> tservers;
- ASSERT_OK(itest::ListTabletServers(cluster_->master_proxy(),
- timeout,
- &tservers));
+ ASSERT_OK(ListTabletServers(cluster_->master_proxy(),
+ timeout,
+ &tservers));
set<string> locations;
for (const auto& tserver : tservers) {
const auto& ts_location = tserver.location();
@@ -390,7 +397,7 @@ TEST_F(AutoRebalancerTest,
NoReplicaMovesIfLocationLoadSkewedByOne) {
}
ASSERT_EQ(kNumTservers, locations.size());
- SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
NO_FATALS(CheckNoMovesScheduled());
}
@@ -423,7 +430,7 @@ TEST_F(AutoRebalancerTest,
NoReplicaMovesIfCannotFixPlacementPolicy) {
}
ASSERT_EQ(kNumLocations, locations.size());
- SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
NO_FATALS(CheckNoMovesScheduled());
}
@@ -444,7 +451,7 @@ TEST_F(AutoRebalancerTest, TestMaxMovesPerServer) {
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
- SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
workload_->Start();
while (workload_->rows_inserted() < 1000) {
SleepFor(MonoDelta::FromMilliseconds(100));
@@ -503,7 +510,7 @@ TEST_F(AutoRebalancerTest, AutoRebalancingUnstableCluster) {
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
- SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
// Inject latency to make tablets' Raft configurations unstable due to
// frequent leader re-elections.
@@ -543,7 +550,7 @@ TEST_F(AutoRebalancerTest,
NoReplicaMovesIfCannotMeetReplicationFactor) {
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
- SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
// Take down a tserver, to make it impossible to meet RF=3 with 2 tservers.
NO_FATALS(cluster_->mini_tablet_server(0)->Shutdown());
@@ -567,7 +574,7 @@ TEST_F(AutoRebalancerTest,
NoRebalancingIfReplicasRecovering) {
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
- SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
// Kill a tablet server so when we add a tablet server, auto-rebalancing will
// do nothing. Also wait a bit until the tserver is deemed unresponsive so
@@ -594,7 +601,7 @@ TEST_F(AutoRebalancerTest, TestHandlingFailedTservers) {
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
- SetupWorkLoad(kNumTablets, /*num_replicas*/3);
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
// Take down all the original tservers.
for (int i = 0; i < kNumTservers; ++i) {
@@ -643,5 +650,83 @@ TEST_F(AutoRebalancerTest, TestHandlingFailedTservers) {
}
}
+// Test that we schedule moves even if some tablets belong to deleted tables.
+// The moves scheduled shouldn't move replicas of deleted tables.
+TEST_F(AutoRebalancerTest, TestDeletedTables) {
+ // Set a low timeout for an unresponsive tserver to be presumed dead by the
+ // master and shorten the interval for recovery re-replication to begin.
+ FLAGS_tserver_unresponsive_timeout_ms = 1000;
+ FLAGS_follower_unavailable_considered_failed_sec = 1;
+ const int kNumTServers = 3;
+ const int kNumTablets = 4;
+ cluster_opts_.num_tablet_servers = kNumTServers;
+ ASSERT_OK(CreateAndStartCluster());
+ NO_FATALS(CheckAutoRebalancerStarted());
+
+ // Create some tablets across multiple tables, so we can test rebalancing
+ // when a table is deleted.
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
+ const string kNewTableName = "dugtrio";
+ {
+ TestWorkload w(cluster_.get());
+ w.set_num_tablets(kNumTablets);
+ w.set_table_name(kNewTableName);
+ w.Setup();
+ }
+ const int initial_bytes_sent_in_orig_tservers =
+ AggregateMetricCounts(GetBytesSentByTServer(), 0, kNumTServers);
+ // Kill a tablet server so when we add a tablet server, the auto-rebalancer
+ // see an unhealthy cluster and do nothing, waiting a bit until the master
+ // deems the server unresponsive.
+ NO_FATALS(cluster_->mini_tablet_server(0)->Shutdown());
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_tserver_unresponsive_timeout_ms));
+
+ // Delete one of the tables after figuring out what its tablet IDs are.
+ GetTableLocationsResponsePB table_locs;
+ ASSERT_OK(GetTableLocations(cluster_->master_proxy(), kNewTableName,
+ MonoDelta::FromSeconds(10),
ReplicaTypeFilter::ANY_REPLICA,
+ /*table_id*/boost::none, &table_locs));
+ unordered_set<string> deleted_tablet_ids;
+ for (const auto& t : table_locs.tablet_locations()) {
+ EmplaceIfNotPresent(&deleted_tablet_ids, t.tablet_id());
+ }
+ ASSERT_OK(workload_->client()->DeleteTable(kNewTableName));
+
+ // Add a new tablet server and restart the stopped server so the rebalancer
+ // can proceed.
+ ASSERT_OK(cluster_->AddTabletServer());
+ ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
+ NO_FATALS(CheckSomeMovesScheduled());
+
+ // Even though not all tables are running, this should succeed because the
+ // table that isn't running has been deleted.
+ ASSERT_EVENTUALLY([&] {
+ int bytes_sent_in_orig_tservers =
+ AggregateMetricCounts(GetBytesSentByTServer(), 0, kNumTServers);
+ ASSERT_GT(bytes_sent_in_orig_tservers,
initial_bytes_sent_in_orig_tservers);
+ });
+ ASSERT_EVENTUALLY([&] {
+ int bytes_fetched_in_new_tservers =
+ AggregateMetricCounts(GetBytesFetchedByTServer(), kNumTServers,
+ cluster_->num_tablet_servers());
+ ASSERT_GT(bytes_fetched_in_new_tservers, 0);
+ });
+
+ // Wait for the replicas to balance out.
+ // NOTE: a skew of 1 is acceptable.
+ vector<string> tablets_on_new_ts;
+ ASSERT_EVENTUALLY([&] {
+ tablets_on_new_ts =
cluster_->mini_tablet_server(kNumTServers)->ListTablets();
+ ASSERT_NEAR(kNumTablets * 3 / cluster_->num_tablet_servers(),
tablets_on_new_ts.size(), 1);
+ });
+ // The tablets moved to the new server shouldn't belong to the deleted table.
+ for (const auto& tid : tablets_on_new_ts) {
+ ASSERT_FALSE(ContainsKey(deleted_tablet_ids, tid)) <<
+ Substitute("Tablets from deleted table: $0, tablets on new server: $1",
+ JoinStrings(deleted_tablet_ids, ","),
+ JoinStrings(tablets_on_new_ts, ","));
+ }
+}
+
} // namespace master
} // namespace kudu
diff --git a/src/kudu/master/auto_rebalancer.cc
b/src/kudu/master/auto_rebalancer.cc
index 0509652..bb90426 100644
--- a/src/kudu/master/auto_rebalancer.cc
+++ b/src/kudu/master/auto_rebalancer.cc
@@ -495,9 +495,13 @@ Status AutoRebalancerTask::BuildClusterRawInfo(
for (const auto& table : table_infos) {
TableMetadataLock table_l(table.get(), LockMode::READ);
+ const SysTablesEntryPB& table_data = table->metadata().state().pb;
+ if (table_data.state() == SysTablesEntryPB::REMOVED) {
+ // Don't worry about rebalancing replicas that belong to deleted tables.
+ continue;
+ }
TableSummary table_summary;
table_summary.id = table->id();
- const SysTablesEntryPB& table_data = table->metadata().state().pb;
table_summary.name = table_data.name();
table_summary.replication_factor = table_data.num_replicas();