This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 15ecb24 KUDU-2842: don't reference CowLock state from TSInfosDict
15ecb24 is described below
commit 15ecb240d8055743090f8739e8e29dd8b9fea0b3
Author: Andrew Wong <[email protected]>
AuthorDate: Thu Sep 19 01:01:47 2019 -0700
KUDU-2842: don't reference CowLock state from TSInfosDict
The following race was previously possible:
Tablet A belongs to table T
1. T1: receive GetTableLocations for table T
2. T1: lock A in READ mode
3. T1: add StringPiece(A) => TSInfoPB to dict, where StringPiece(A) points
to the CowLock state
4. T1: unlock A in READ mode
5. T2: receive ProcessFullTabletReport that has an updated cstate for A
6. T2: lock A in WRITE mode
7. T2: COMMIT mutation to A based on the report, blowing away old in-memory
CowLock state, and unlock A
8. T1: try to update dict, but key StringPiece(A) is corrupted
This patch addresses this by making the following change:
3. T1: add StringPiece(A) => TSInfoPB to dict, where StringPiece(A)
points into the TSInfoPB
This patch adds a variant of the ComputeIfAbsent() map utility that
facilitates this self-referential map insertion.
Testing:
- adds a test for the new map function
- adds a test that runs many elections while concurrently hammering the
GetTableLocations endpoint; without this patch, the test would hit
data races
- I also reran the benchmark posted in 586e957f7 to verify this doesn't
regress
performance
With this patch:
================
Count: 57230
Mean: 1341.46
Percentiles:
0% (min) = 69
25% = 772
50% (med) = 1408
75% = 1728
95% = 2240
99% = 2736
99.9% = 3616
99.99% = 5440
100% (max) = 11976
Without this patch:
===================
Count: 56325
Mean: 1360.05
Percentiles:
0% (min) = 122
25% = 980
50% (med) = 1408
75% = 1712
95% = 2192
99% = 2656
99.9% = 3392
99.99% = 5088
100% (max) = 8544
Change-Id: I30f4cd2eb8439e1923c1c2617248514354561d16
Reviewed-on: http://gerrit.cloudera.org:8080/14263
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins
---
src/kudu/gutil/map-util.h | 43 +++++-
.../integration-tests/ts_tablet_manager-itest.cc | 159 +++++++++++++++------
src/kudu/master/catalog_manager.cc | 11 +-
src/kudu/util/map-util-test.cc | 36 +++++
4 files changed, 199 insertions(+), 50 deletions(-)
diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index 1876547..7b6d7bb 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -875,20 +875,47 @@ void AppendValuesFromMap(const MapContainer&
map_container,
// MyValue* const value = result.first;
// if (result.second) ....
//
+// The ComputePair* variants expect a lambda that creates a pair<k, v>. This
+// can be useful if the key is a StringPiece pointing to external state to
+// avoid excess memory for the keys, while being safer in multi-threaded
+// contexts, e.g. in case the key goes out of scope before the container does.
+//
+// Example usage:
+//
+// map<StringPiece, int, GoodFastHash<StringPiece>> string_to_idx;
+// vector<unique_ptr<StringPB>> pbs;
+// auto result = ComputePairIfAbsentReturnAbsense(&string_to_idx, my_key,
+// [&]() {
+// unique_ptr<StringPB> s = new StringPB();
+// s->set_string(my_key);
+// int idx = pbs.size();
+// pbs.emplace_back(s.release());
+// return make_pair(StringPiece(pbs.back()->string()), idx);
+// });
template <class MapContainer, typename Function>
std::pair<typename MapContainer::mapped_type* const, bool>
-ComputeIfAbsentReturnAbsense(MapContainer* container,
- const typename MapContainer::key_type& key,
- Function compute_func) {
+ComputePairIfAbsentReturnAbsense(MapContainer* container,
+ const typename MapContainer::key_type& key,
+ Function compute_pair_func) {
typename MapContainer::iterator iter = container->find(key);
bool new_value = iter == container->end();
if (new_value) {
+ auto p = compute_pair_func();
std::pair<typename MapContainer::iterator, bool> result =
- container->emplace(key, compute_func());
+ container->emplace(std::move(p.first), std::move(p.second));
DCHECK(result.second) << "duplicate key: " << key;
iter = result.first;
}
return std::make_pair(&iter->second, new_value);
+}
+template <class MapContainer, typename Function>
+std::pair<typename MapContainer::mapped_type* const, bool>
+ComputeIfAbsentReturnAbsense(MapContainer* container,
+ const typename MapContainer::key_type& key,
+ Function compute_func) {
+ return ComputePairIfAbsentReturnAbsense(container, key, [&key,
&compute_func] {
+ return std::make_pair(key, compute_func());
+ });
};
// Like the above but doesn't return a pair, just returns a pointer to the
value.
@@ -906,4 +933,12 @@ ComputeIfAbsent(MapContainer* container,
return ComputeIfAbsentReturnAbsense(container, key, compute_func).first;
};
+template <class MapContainer, typename Function>
+typename MapContainer::mapped_type* const
+ComputePairIfAbsent(MapContainer* container,
+ const typename MapContainer::key_type& key,
+ Function compute_pair_func) {
+ return ComputePairIfAbsentReturnAbsense<MapContainer, Function>(container,
key, compute_pair_func).first;
+};
+
#endif // UTIL_GTL_MAP_UTIL_H_
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 6f42415..d728d46 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -23,6 +23,7 @@
#include <memory>
#include <ostream>
#include <string>
+#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -41,6 +42,7 @@
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
@@ -56,6 +58,7 @@
#include "kudu/master/table_metrics.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/heartbeater.h"
@@ -63,12 +66,14 @@
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tablet_server_options.h"
#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -104,6 +109,8 @@ using kudu::consensus::RaftPeerPB;
using kudu::itest::SimpleIntKeyKuduSchema;
using kudu::KuduPartialRow;
using kudu::master::CatalogManager;
+using kudu::master::GetTableLocationsResponsePB;
+using kudu::master::GetTableLocationsRequestPB;
using kudu::master::Master;
using kudu::master::MasterServiceProxy;
using kudu::master::ReportedTabletPB;
@@ -117,6 +124,7 @@ using kudu::ClusterVerifier;
using std::map;
using std::shared_ptr;
using std::string;
+using std::thread;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
@@ -268,51 +276,125 @@ INSTANTIATE_TEST_CASE_P(,
FailedTabletsAreReplacedITest,
::testing::Bool());
-// Test that when the leader changes, the tablet manager gets notified and
-// includes that information in the next tablet report.
-TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
+class LeadershipChangeReportingTest : public TsTabletManagerITest {
+ public:
const int kNumReplicas = 2;
- {
+ void SetUp() override {
+ NO_FATALS(TsTabletManagerITest::SetUp());
+
+ // For tests that heartbeat, set a lower interval to speed things up.
+ FLAGS_heartbeat_interval_ms = 100;
+
InternalMiniClusterOptions opts;
opts.num_tablet_servers = kNumReplicas;
NO_FATALS(StartCluster(std::move(opts)));
- }
- // We need to control elections precisely for this test since we're using
- // EmulateElection() with a distributed consensus configuration.
- FLAGS_enable_leader_failure_detection = false;
- FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false;
+ // We need to control elections precisely for this test since we're using
+ // EmulateElection() with a distributed consensus configuration.
+ FLAGS_enable_leader_failure_detection = false;
+ FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false;
- // Allow creating table with even replication factor.
- FLAGS_allow_unsafe_replication_factor = true;
+ // Allow creating table with even replication factor.
+ FLAGS_allow_unsafe_replication_factor = true;
- // Run a few more iters in slow-test mode.
- OverrideFlagForSlowTests("num_election_test_loops", "10");
+ // Run a few more iters in slow-test mode.
+ OverrideFlagForSlowTests("num_election_test_loops", "10");
- // Create the table.
- client::sp::shared_ptr<KuduTable> table;
- unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
- ASSERT_OK(table_creator->table_name(kTableName)
- .schema(&schema_)
- .set_range_partition_columns({ "key" })
- .num_replicas(kNumReplicas)
- .Create());
- ASSERT_OK(client_->OpenTable(kTableName, &table));
+ // Build a TServerDetails map so we can check for convergence.
+ const auto& addr = cluster_->mini_master()->bound_rpc_addr();
+ master_proxy_.reset(new MasterServiceProxy(client_messenger_, addr,
addr.host()));
+ }
- // Build a TServerDetails map so we can check for convergence.
- const auto& addr = cluster_->mini_master()->bound_rpc_addr();
- shared_ptr<MasterServiceProxy> master_proxy(
- new MasterServiceProxy(client_messenger_, addr, addr.host()));
+ // Creates 'num_hashes' tablets with two replicas each.
+ Status CreateTable(int num_hashes) {
+ // Create the table.
+ client::sp::shared_ptr<KuduTable> table;
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ if (num_hashes > 1) {
+ RETURN_NOT_OK(table_creator->table_name(kTableName)
+ .schema(&schema_)
+ .set_range_partition_columns({ "key" })
+ .num_replicas(kNumReplicas)
+ .add_hash_partitions({ "key" }, num_hashes)
+ .Create());
+ } else {
+ RETURN_NOT_OK(table_creator->table_name(kTableName)
+ .schema(&schema_)
+ .set_range_partition_columns({ "key" })
+ .num_replicas(kNumReplicas)
+ .Create());
+ }
+ RETURN_NOT_OK(client_->OpenTable(kTableName, &table));
- itest::TabletServerMap ts_map;
- ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map));
- ValueDeleter deleter(&ts_map);
+ RETURN_NOT_OK(CreateTabletServerMap(master_proxy_, client_messenger_,
&ts_map_));
- // Collect the TabletReplicas so we get direct access to RaftConsensus.
- vector<scoped_refptr<TabletReplica>> tablet_replicas;
- ASSERT_OK(PrepareTabletReplicas(MonoDelta::FromSeconds(60),
&tablet_replicas));
- ASSERT_EQ(kNumReplicas, tablet_replicas.size());
+ // Collect the TabletReplicas so we get direct access to RaftConsensus.
+ RETURN_NOT_OK(PrepareTabletReplicas(MonoDelta::FromSeconds(60),
&tablet_replicas_));
+ CHECK_EQ(kNumReplicas * num_hashes, tablet_replicas_.size());
+ return Status::OK();
+ }
+ void TearDown() override {
+ ValueDeleter deleter(&ts_map_);
+ NO_FATALS(TsTabletManagerITest::TearDown());
+ }
+
+ Status TriggerElection(int min_term = 0, int* new_leader_idx = nullptr) {
+ int leader_idx = rand() % tablet_replicas_.size();
+ LOG(INFO) << "Electing peer " << leader_idx << "...";
+ RaftConsensus* con =
CHECK_NOTNULL(tablet_replicas_[leader_idx]->consensus());
+ RETURN_NOT_OK(con->EmulateElection());
+ LOG(INFO) << "Waiting for servers to agree...";
+ RETURN_NOT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5), ts_map_,
+ tablet_replicas_[leader_idx]->tablet_id(), min_term));
+ if (new_leader_idx) {
+ *new_leader_idx = leader_idx;
+ }
+ return Status::OK();
+ }
+
+ protected:
+ shared_ptr<MasterServiceProxy> master_proxy_;
+ vector<scoped_refptr<TabletReplica>> tablet_replicas_;
+ itest::TabletServerMap ts_map_;
+};
+// Regression test for KUDU-2842: concurrent calls to GetTableLocations()
+// shouldn't lead to data races with the changes in reported state.
+TEST_F(LeadershipChangeReportingTest, TestConcurrentGetTableLocations) {
+ // KUDU-2842 requires there to be multiple tablets in a given report, so
+ // create multiple tablets.
+ int kNumTablets = 2;
+ ASSERT_OK(CreateTable(kNumTablets));
+ CountDownLatch latch(1);
+ thread t([&] {
+ master::GetTableLocationsRequestPB req;
+ req.mutable_table()->set_table_name(kTableName);
+ req.set_intern_ts_infos_in_response(true);
+ while (!latch.WaitFor(MonoDelta::FromMilliseconds(10))) {
+ master::GetTableLocationsResponsePB resp;
+ rpc::RpcController rpc;
+ // Note: we only really care about data races, rather than the responses.
+ ignore_result(master_proxy_->GetTableLocations(req, &resp, &rpc));
+ }
+ });
+ SCOPED_CLEANUP({
+ latch.CountDown();
+ NO_FATALS(t.join());
+ });
+ for (int i = 0; i < FLAGS_num_election_test_loops; i++) {
+ for (int t = 0; t < kNumTablets; t++) {
+ // Note: we only really care about data races, rather than the success of
+ // all the elections.
+ ignore_result(TriggerElection());
+ }
+ SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_heartbeat_interval_ms));
+ }
+}
+
+// Test that when the leader changes, the tablet manager gets notified and
+// includes that information in the next tablet report.
+TEST_F(LeadershipChangeReportingTest, TestUpdatedConsensusState) {
+ ASSERT_OK(CreateTable(1));
// Stop heartbeating we don't race against the Master.
DisableHeartbeatingToMaster();
@@ -320,13 +402,8 @@ TEST_F(TsTabletManagerITest,
TestReportNewLeaderOnLeaderChange) {
// TSTabletManager should acknowledge the role changes via tablet reports.
for (int i = 0; i < FLAGS_num_election_test_loops; i++) {
SCOPED_TRACE(Substitute("Iter: $0", i));
- int new_leader_idx = rand() % 2;
- LOG(INFO) << "Electing peer " << new_leader_idx << "...";
- RaftConsensus* con =
CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus());
- ASSERT_OK(con->EmulateElection());
- LOG(INFO) << "Waiting for servers to agree...";
- ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5),
- ts_map, tablet_replicas[0]->tablet_id(), i
+ 1));
+ int new_leader_idx;
+ ASSERT_OK(TriggerElection(i + 1, &new_leader_idx));
// Now check that the tablet report reports the correct role for both
servers.
for (int replica = 0; replica < kNumReplicas; replica++) {
@@ -342,7 +419,7 @@ TEST_F(TsTabletManagerITest,
TestReportNewLeaderOnLeaderChange) {
const ReportedTabletPB& reported_tablet = report.updated_tablets(0);
ASSERT_TRUE(reported_tablet.has_consensus_state());
- string uuid = tablet_replicas[replica]->permanent_uuid();
+ string uuid = tablet_replicas_[replica]->permanent_uuid();
RaftPeerPB::Role role = GetConsensusRole(uuid,
reported_tablet.consensus_state());
if (replica == new_leader_idx) {
ASSERT_EQ(RaftPeerPB::LEADER, role)
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index 34a41ed..8f2d1f9 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -4843,13 +4843,14 @@ Status CatalogManager::BuildLocationsForTablet(
dimension = l_tablet.data().pb.dimension_label();
}
if (ts_infos_dict) {
- int idx = *ComputeIfAbsent(
+ int idx = *ComputePairIfAbsent(
&ts_infos_dict->uuid_to_idx, peer.permanent_uuid(),
- [&]() -> int {
+ [&]() -> pair<StringPiece, int> {
+ auto& ts_info_pbs = ts_infos_dict->ts_info_pbs;
auto pb = make_tsinfo_pb();
- int idx = ts_infos_dict->ts_info_pbs.size();
- ts_infos_dict->ts_info_pbs.emplace_back(pb.release());
- return idx;
+ int ts_info_idx = ts_info_pbs.size();
+ ts_info_pbs.emplace_back(pb.release());
+ return { ts_info_pbs.back()->permanent_uuid(), ts_info_idx };
});
auto* interned_replica_pb = locs_pb->add_interned_replicas();
diff --git a/src/kudu/util/map-util-test.cc b/src/kudu/util/map-util-test.cc
index 78a3484..51b428e 100644
--- a/src/kudu/util/map-util-test.cc
+++ b/src/kudu/util/map-util-test.cc
@@ -22,14 +22,22 @@
#include <map>
#include <memory>
#include <string>
+#include <unordered_map>
#include <utility>
+#include <vector>
#include <gtest/gtest.h>
+#include "kudu/gutil/strings/stringpiece.h"
+
+template <class X> struct GoodFastHash;
+
using std::map;
using std::string;
using std::shared_ptr;
using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
namespace kudu {
@@ -69,6 +77,34 @@ TEST(ComputeIfAbsentTest,
TestComputeIfAbsentAndReturnAbsense) {
ASSERT_EQ(*result2.first, "hello_world");
}
+namespace {
+// Simple struct to act as a container for a string. While not necessary per
+// se, this is more representative of the expected usage of
+// ComputePairIfAbsent* (i.e. pointing to internal state of more complex
+// objects).
+struct SimpleStruct {
+ string str;
+};
+} // anonymous namespace
+
+TEST(ComputePairIfAbsentTest, TestComputePairDestructState) {
+ unordered_map<StringPiece, int, GoodFastHash<StringPiece>> string_to_idx;
+ const string kBigKey = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
+ string big_key = kBigKey;
+ vector<SimpleStruct> big_structs;
+ auto result = ComputePairIfAbsentReturnAbsense(&string_to_idx, big_key,
+ [&] () -> std::pair<StringPiece, int> {
+ int idx = big_structs.size();
+ big_structs.emplace_back(SimpleStruct({ big_key }));
+ return { big_structs.back().str, idx };
+ });
+ // Clear the original key state. This shouldn't have any effect on the map.
+ big_key.clear();
+ ASSERT_TRUE(result.second);
+ ASSERT_EQ(*result.first, FindOrDie(string_to_idx, kBigKey));
+ ASSERT_EQ(kBigKey, big_structs[*result.first].str);
+}
+
TEST(FindPointeeOrNullTest, TestFindPointeeOrNull) {
map<string, unique_ptr<string>> my_map;
auto iter = my_map.emplace("key", unique_ptr<string>(new
string("hello_world")));