This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 38c2e12 [tools] a small cleanup on ksck code
38c2e12 is described below
commit 38c2e122292b262a4a2ee1af931b27dd272bd862
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue May 14 11:57:27 2019 -0700
[tools] a small cleanup on ksck code
Change-Id: I14d2451cc757c95543be6fa8c7dc5c95c7d73be8
Reviewed-on: http://gerrit.cloudera.org:8080/13379
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Adar Dembo <[email protected]>
---
src/kudu/tools/ksck.cc | 121 +++++++++++++++++++++---------------------
src/kudu/tools/ksck.h | 9 ++--
src/kudu/tools/ksck_remote.cc | 76 +++++++++++++-------------
3 files changed, 101 insertions(+), 105 deletions(-)
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 0a77cc0..134319f 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -18,6 +18,8 @@
#include "kudu/tools/ksck.h"
#include <algorithm>
+#include <atomic>
+#include <cstddef>
#include <cstdint>
#include <iostream>
#include <iterator>
@@ -41,7 +43,6 @@
#include "kudu/tools/color.h"
#include "kudu/tools/ksck_checksum.h"
#include "kudu/tools/tool_action_common.h"
-#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/string_case.h"
@@ -70,14 +71,12 @@ DEFINE_string(ksck_format, "plain_concise",
DEFINE_bool(consensus, true,
"Whether to check the consensus state from each tablet against the
master.");
+using std::atomic;
using std::cout;
-using std::endl;
using std::ostream;
using std::ostringstream;
-using std::set;
using std::shared_ptr;
using std::string;
-using std::unordered_map;
using std::vector;
using strings::Substitute;
@@ -154,20 +153,20 @@ std::ostream& operator<<(std::ostream& lhs,
KsckFetchState state) {
Ksck::Ksck(shared_ptr<KsckCluster> cluster, ostream* out)
: cluster_(std::move(cluster)),
out_(out == nullptr ? &std::cout : out) {
- CHECK_OK(ThreadPoolBuilder("Ksck-fetch")
+ CHECK_OK(ThreadPoolBuilder("ksck-fetch")
.set_max_threads(FLAGS_fetch_info_concurrency)
.set_idle_timeout(MonoDelta::FromMilliseconds(10))
.Build(&pool_));
}
Status Ksck::CheckMasterHealth() {
- int num_masters = static_cast<int>(cluster_->masters().size());
+ const auto num_masters = cluster_->masters().size();
if (num_masters == 0) {
- return Status::NotFound("No masters found");
+ return Status::NotFound("no masters found");
}
- AtomicInt<int32_t> bad_masters(0);
- AtomicInt<int32_t> unauthorized_masters(0);
+ atomic<size_t> bad_masters(0);
+ atomic<size_t> unauthorized_masters(0);
vector<KsckServerHealthSummary> master_summaries;
simple_spinlock master_summaries_lock;
@@ -185,16 +184,16 @@ Status Ksck::CheckMasterHealth() {
if (!s.ok()) {
if (IsNotAuthorizedMethodAccess(s)) {
sh.health = KsckServerHealth::UNAUTHORIZED;
- unauthorized_masters.Increment();
+ ++unauthorized_masters;
} else {
sh.health = KsckServerHealth::UNAVAILABLE;
}
- bad_masters.Increment();
+ ++bad_masters;
}
{
std::lock_guard<simple_spinlock> lock(master_summaries_lock);
- master_summaries.push_back(std::move(sh));
+ master_summaries.emplace_back(std::move(sh));
}
// Fetch the flags information.
@@ -202,7 +201,7 @@ Status Ksck::CheckMasterHealth() {
s = master->FetchUnusualFlags();
if (!s.ok()) {
std::lock_guard<simple_spinlock> lock(master_summaries_lock);
- results_.warning_messages.push_back(s.CloneAndPrepend(Substitute(
+ results_.warning_messages.emplace_back(s.CloneAndPrepend(Substitute(
"unable to get flag information for master $0 ($1)",
master->uuid(),
master->address())));
@@ -215,16 +214,16 @@ Status Ksck::CheckMasterHealth() {
// Return a NotAuthorized status if any master has auth errors, since this
// indicates ksck may not be able to gather full and accurate info.
- if (unauthorized_masters.Load() > 0) {
+ if (unauthorized_masters > 0) {
return Status::NotAuthorized(
Substitute("failed to gather info from $0 of $1 "
"masters due to lack of admin privileges",
- unauthorized_masters.Load(), num_masters));
+ unauthorized_masters.load(), num_masters));
}
- if (bad_masters.Load() > 0) {
+ if (bad_masters > 0) {
return Status::NetworkError(
Substitute("failed to gather info from all masters: $0 of $1 had
errors",
- bad_masters.Load(), num_masters));
+ bad_masters.load(), num_masters));
}
return Status::OK();
}
@@ -323,16 +322,15 @@ Status Ksck::FetchTableAndTabletInfo() {
}
Status Ksck::FetchInfoFromTabletServers() {
- VLOG(1) << "Fetching the list of tablet servers";
- int servers_count = static_cast<int>(cluster_->tablet_servers().size());
+ const auto servers_count = cluster_->tablet_servers().size();
VLOG(1) << Substitute("List of $0 tablet servers retrieved", servers_count);
if (servers_count == 0) {
return Status::OK();
}
- AtomicInt<int32_t> bad_servers(0);
- AtomicInt<int32_t> unauthorized_servers(0);
+ atomic<size_t> bad_servers(0);
+ atomic<size_t> unauthorized_servers(0);
VLOG(1) << "Fetching info from all " << servers_count << " tablet servers";
vector<KsckServerHealthSummary> tablet_server_summaries;
@@ -341,44 +339,45 @@ Status Ksck::FetchInfoFromTabletServers() {
for (const auto& entry : cluster_->tablet_servers()) {
const auto& ts = entry.second;
RETURN_NOT_OK(pool_->SubmitFunc([&]() {
- VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
- KsckServerHealth health;
- Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
- if (FLAGS_consensus) {
- return ts->FetchConsensusState(&health);
- }
- return Status::OK();
- });
- KsckServerHealthSummary summary;
- summary.uuid = ts->uuid();
- summary.address = ts->address();
- summary.ts_location = ts->location();
- summary.version = ts->version();
- summary.status = s;
- if (!s.ok()) {
- if (IsNotAuthorizedMethodAccess(s)) {
- health = KsckServerHealth::UNAUTHORIZED;
- unauthorized_servers.Increment();
+ VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
+ KsckServerHealth health;
+ Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
+ if (FLAGS_consensus) {
+ return ts->FetchConsensusState(&health);
}
- bad_servers.Increment();
+ return Status::OK();
+ });
+ KsckServerHealthSummary summary;
+ summary.uuid = ts->uuid();
+ summary.address = ts->address();
+ summary.ts_location = ts->location();
+ summary.version = ts->version();
+ summary.status = s;
+ if (!s.ok()) {
+ if (IsNotAuthorizedMethodAccess(s)) {
+ health = KsckServerHealth::UNAUTHORIZED;
+ ++unauthorized_servers;
}
- summary.health = health;
+ ++bad_servers;
+ }
+ summary.health = health;
- {
- std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
- tablet_server_summaries.push_back(std::move(summary));
- }
+ {
+ std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
+ tablet_server_summaries.push_back(std::move(summary));
+ }
- // Fetch the flags information.
- // Failing to gather flags is only a warning.
- s = ts->FetchUnusualFlags();
- if (!s.ok()) {
- std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
- results_.warning_messages.push_back(s.CloneAndPrepend(Substitute(
- "unable to get flag information for tablet server $0 ($1)",
- ts->uuid(),
- ts->address())));
- }
+ // Fetch the flags information.
+ // Failing to gather flags is only a warning since fetching the flags
+ // is not supported in older versions.
+ s = ts->FetchUnusualFlags();
+ if (!s.ok()) {
+ std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
+ results_.warning_messages.emplace_back(s.CloneAndPrepend(Substitute(
+ "unable to get flag information for tablet server $0 ($1)",
+ ts->uuid(),
+ ts->address())));
+ }
}));
}
pool_->Wait();
@@ -387,16 +386,16 @@ Status Ksck::FetchInfoFromTabletServers() {
// Return a NotAuthorized status if any tablet server has auth errors, since
// this indicates ksck may not be able to gather full and accurate info.
- if (unauthorized_servers.Load() > 0) {
+ if (unauthorized_servers > 0) {
return Status::NotAuthorized(
Substitute("failed to gather info from $0 of $1 tablet servers due "
"to lack of admin privileges",
- unauthorized_servers.Load(), servers_count));
+ unauthorized_servers.load(), servers_count));
}
- if (bad_servers.Load() > 0) {
+ if (bad_servers > 0) {
return Status::NetworkError(
- Substitute("failed to gather info for all tablet servers: $0 of $1 had
errors",
- bad_servers.Load(), servers_count));
+ Substitute("failed to gather info for all tablet servers: $0 of $1 had
errors",
+ bad_servers.load(), servers_count));
}
return Status::OK();
}
@@ -594,7 +593,7 @@ Status Ksck::ChecksumData(const KsckChecksumOptions& opts) {
}
bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {
- const auto all_tablets = table->tablets();
+ const auto& all_tablets = table->tablets();
vector<shared_ptr<KsckTablet>> tablets;
std::copy_if(all_tablets.begin(), all_tablets.end(),
std::back_inserter(tablets),
[&](const shared_ptr<KsckTablet>& t) {
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index ca788ab..181af26 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -123,11 +123,12 @@ class KsckTablet {
// Representation of a table. Composed of tablets.
class KsckTable {
public:
- KsckTable(std::string id, std::string name, const Schema& schema, int
num_replicas)
+ KsckTable(std::string id, std::string name, Schema schema, int num_replicas)
: id_(std::move(id)),
name_(std::move(name)),
- schema_(schema),
- num_replicas_(num_replicas) {}
+ schema_(std::move(schema)),
+ num_replicas_(num_replicas) {
+ }
const std::string& id() const {
return id_;
@@ -149,7 +150,7 @@ class KsckTable {
tablets_ = std::move(tablets);
}
- std::vector<std::shared_ptr<KsckTablet>>& tablets() {
+ const std::vector<std::shared_ptr<KsckTablet>>& tablets() const {
return tablets_;
}
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index f217108..efa2ba6 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -61,7 +61,6 @@
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_service.pb.h"
#include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
@@ -74,27 +73,26 @@ DECLARE_int32(fetch_info_concurrency);
DEFINE_bool(checksum_cache_blocks, false, "Should the checksum scanners cache
the read blocks");
-namespace kudu {
-namespace tools {
-
-static const std::string kMessengerName = "ksck";
-
-using client::KuduClient;
-using client::KuduClientBuilder;
-using client::KuduReplica;
-using client::KuduScanToken;
-using client::KuduScanTokenBuilder;
-using client::KuduTable;
-using client::KuduTabletServer;
-using client::internal::ReplicaController;
-using rpc::Messenger;
-using rpc::MessengerBuilder;
-using rpc::RpcController;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduScanToken;
+using kudu::client::KuduScanTokenBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduTabletServer;
+using kudu::client::internal::ReplicaController;
+using kudu::rpc::Messenger;
+using kudu::rpc::MessengerBuilder;
+using kudu::rpc::RpcController;
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
+namespace kudu {
+namespace tools {
+
+static const std::string kMessengerName = "ksck";
+
namespace {
MonoDelta GetDefaultTimeout() {
return MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
@@ -516,54 +514,52 @@ Status RemoteKsckCluster::RetrieveTablesList() {
vector<string> table_names;
RETURN_NOT_OK(client_->ListTables(&table_names));
- int num_tables = static_cast<int>(table_names.size());
- if (num_tables == 0) {
+ if (table_names.empty()) {
return Status::OK();
}
- AtomicInt<int32_t> bad_tables(0);
vector<shared_ptr<KsckTable>> tables;
+ tables.reserve(table_names.size());
simple_spinlock tables_lock;
for (const auto& table_name : table_names) {
RETURN_NOT_OK(pool_->SubmitFunc([&]() {
- client::sp::shared_ptr<KuduTable> t;
- Status s = client_->OpenTable(table_name, &t);
- if (!s.ok()) {
- bad_tables.Increment();
- LOG(ERROR) << Substitute("unable to open table $0: $1", table_name,
s.ToString());
- return;
- }
- shared_ptr<KsckTable> table(new KsckTable(t->id(),
- table_name,
- *t->schema().schema_,
- t->num_replicas()));
- {
- std::lock_guard<simple_spinlock> l(tables_lock);
- tables.push_back(table);
- }
+ client::sp::shared_ptr<KuduTable> t;
+ Status s = client_->OpenTable(table_name, &t);
+ if (!s.ok()) {
+ LOG(ERROR) << Substitute("unable to open table $0: $1",
+ table_name, s.ToString());
+ return;
+ }
+ shared_ptr<KsckTable> table(new KsckTable(t->id(),
+ table_name,
+
KuduSchema::ToSchema(t->schema()),
+ t->num_replicas()));
+ {
+ std::lock_guard<simple_spinlock> l(tables_lock);
+ tables.emplace_back(std::move(table));
+ }
}));
}
pool_->Wait();
tables_.swap(tables);
- if (bad_tables.Load() > 0) {
+ if (tables_.size() < table_names.size()) {
return Status::NetworkError(
Substitute("failed to gather info from all tables: $0 of $1 had
errors",
- bad_tables.Load(), num_tables));
+ table_names.size() - tables_.size(), table_names.size()));
}
return Status::OK();
}
Status RemoteKsckCluster::RetrieveAllTablets() {
- int num_tables = static_cast<int>(tables().size());
- if (num_tables == 0) {
+ if (tables_.empty()) {
return Status::OK();
}
- for (const shared_ptr<KsckTable>& table : tables()) {
+ for (const auto& table : tables_) {
RETURN_NOT_OK(pool_->SubmitFunc(
std::bind(&KsckCluster::RetrieveTabletsList, this, table)));
}