Repository: kudu Updated Branches: refs/heads/master ec654c49f -> da28d0aee
[tools] ksck checksums: Add KsckChecksummer class This removes the remaining checksum logic out of Ksck and into a new KsckChecksummer class, along with refactoring some parts of the logic into separate functions. There are no functional changes. The logic that was not refactored is directly related to KUDU-2179, and will be addressed in a follow-up patch. Change-Id: I2016936eaa26fd6b499783e7d5d8f404816b37fa Reviewed-on: http://gerrit.cloudera.org:8080/11498 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> Reviewed-by: Andrew Wong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/63e4a0a7 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/63e4a0a7 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/63e4a0a7 Branch: refs/heads/master Commit: 63e4a0a700eeb2677f232bc472b36769be3e0382 Parents: ec654c4 Author: Will Berkeley <[email protected]> Authored: Fri Sep 21 16:45:35 2018 -0700 Committer: Will Berkeley <[email protected]> Committed: Wed Oct 3 18:04:36 2018 +0000 ---------------------------------------------------------------------- src/kudu/tools/ksck.cc | 191 ++----------------------- src/kudu/tools/ksck.h | 6 +- src/kudu/tools/ksck_checksum.cc | 263 ++++++++++++++++++++++++++++++++++- src/kudu/tools/ksck_checksum.h | 65 +++++++++ 4 files changed, 337 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc index a5ed001..4597668 100644 --- a/src/kudu/tools/ksck.cc +++ b/src/kudu/tools/ksck.cc @@ -35,7 +35,6 @@ #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/tablet.pb.h" @@ -43,9 +42,7 @@ #include "kudu/tools/ksck_checksum.h" #include "kudu/tools/tool_action_common.h" #include "kudu/util/atomic.h" -#include "kudu/util/blocking_queue.h" #include "kudu/util/locks.h" -#include "kudu/util/monotime.h" #include "kudu/util/threadpool.h" #define PUSH_PREPEND_NOT_OK(s, statuses, msg) do { \ @@ -421,8 +418,13 @@ Status Ksck::Run() { "table consistency check error"); if (FLAGS_checksum_scan) { - PUSH_PREPEND_NOT_OK(ChecksumData(KsckChecksumOptions()), - results_.error_messages, "checksum scan error"); + // Copy the filters because they are passed by-value. + auto table_filters_for_checksum_opts = table_filters_; + auto tablet_id_filters_for_checksum_opts = tablet_id_filters_; + PUSH_PREPEND_NOT_OK( + ChecksumData(KsckChecksumOptions(std::move(table_filters_for_checksum_opts), + std::move(tablet_id_filters_for_checksum_opts))), + results_.error_messages, "checksum scan error"); } // Use a special-case error if there are auth errors. This makes it harder @@ -531,180 +533,13 @@ Status Ksck::CheckTablesConsistency() { } Status Ksck::ChecksumData(const KsckChecksumOptions& opts) { - // Copy options so that local modifications can be made and passed on. - KsckChecksumOptions options = opts; - - typedef unordered_map<shared_ptr<KsckTablet>, shared_ptr<KsckTable>> TabletTableMap; - TabletTableMap tablet_table_map; - - int num_tables = 0; - int num_tablets = 0; - int num_tablet_replicas = 0; - for (const shared_ptr<KsckTable>& table : cluster_->tables()) { - VLOG(1) << "Table: " << table->name(); - if (!MatchesAnyPattern(table_filters_, table->name())) continue; - num_tables += 1; - num_tablets += table->tablets().size(); - for (const shared_ptr<KsckTablet>& tablet : table->tablets()) { - VLOG(1) << "Tablet: " << tablet->id(); - if (!MatchesAnyPattern(tablet_id_filters_, tablet->id())) continue; - InsertOrDie(&tablet_table_map, tablet, table); - num_tablet_replicas += tablet->replicas().size(); - } - } - - if (num_tables == 0) { - string msg = "No table found."; - if (!table_filters_.empty()) { - msg += " Filter: table_filters=" + JoinStrings(table_filters_, ","); - } - return Status::NotFound(msg); - } - - if (num_tablets > 0 && num_tablet_replicas == 0) { - // Warn if the table has tablets, but no replicas. The table may have no - // tablets if all range partitions have been dropped. - string msg = "No tablet replicas found."; - if (!table_filters_.empty() || !tablet_id_filters_.empty()) { - msg += " Filter: "; - if (!table_filters_.empty()) { - msg += "table_filters=" + JoinStrings(table_filters_, ","); - } - if (!tablet_id_filters_.empty()) { - msg += "tablet_id_filters=" + JoinStrings(tablet_id_filters_, ","); - } - } - return Status::NotFound(msg); - } - - // Map of tablet servers to tablet queue. - typedef unordered_map<shared_ptr<KsckTabletServer>, SharedTabletQueue> TabletServerQueueMap; - - TabletServerQueueMap tablet_server_queues; - scoped_refptr<ChecksumResultReporter> reporter(new ChecksumResultReporter(num_tablet_replicas)); - - // Create a queue of checksum callbacks grouped by the tablet server. - for (const TabletTableMap::value_type& entry : tablet_table_map) { - const shared_ptr<KsckTablet>& tablet = entry.first; - const shared_ptr<KsckTable>& table = entry.second; - for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas()) { - const shared_ptr<KsckTabletServer>& ts = - FindOrDie(cluster_->tablet_servers(), replica->ts_uuid()); - - const SharedTabletQueue& queue = - LookupOrInsertNewSharedPtr(&tablet_server_queues, ts, num_tablet_replicas); - CHECK_EQ(QUEUE_SUCCESS, queue->Put(make_pair(table->schema(), tablet->id()))); - } - } - - if (options.use_snapshot && - options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) { - // Set the snapshot timestamp to the current timestamp of the first healthy tablet server - // we can find. - for (const auto& ts : tablet_server_queues) { - if (ts.first->is_healthy()) { - options.snapshot_timestamp = ts.first->current_timestamp(); - break; - } - } - if (options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) { - return Status::ServiceUnavailable( - "No tablet servers were available to fetch the current timestamp"); - } - results_.checksum_results.snapshot_timestamp = options.snapshot_timestamp; - } - - // Kick off checksum scans in parallel. For each tablet server, we start - // scan_concurrency scans. Each callback then initiates one additional - // scan when it returns if the queue for that TS is not empty. - for (const TabletServerQueueMap::value_type& entry : tablet_server_queues) { - const shared_ptr<KsckTabletServer>& tablet_server = entry.first; - const SharedTabletQueue& queue = entry.second; - queue->Shutdown(); // Ensures that BlockingGet() will not block. - for (int i = 0; i < options.scan_concurrency; i++) { - std::pair<Schema, std::string> table_tablet; - if (queue->BlockingGet(&table_tablet)) { - const Schema& table_schema = table_tablet.first; - const std::string& tablet_id = table_tablet.second; - auto* cbs = new TabletServerChecksumCallbacks( - reporter, tablet_server, queue, tablet_id, options); - // 'cbs' deletes itself when complete. - tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, cbs); - } - } - } - - // Don't ruin JSON output by printing progress updates. + KsckChecksummer checksummer(cluster_.get()); + auto* checksum_results = &results_.checksum_results; + // Don't ruin JSON output with ad hoc progress updates. auto* out_for_progress_updates = IsNonJSONFormat() ? out_ : nullptr; - bool timed_out = !reporter->WaitFor(options.timeout, out_for_progress_updates); - - // Even if we timed out, for printing collate the checksum results that we did get. - ChecksumResultReporter::TabletResultMap checksums = reporter->checksums(); - - int num_errors = 0; - int num_mismatches = 0; - int num_results = 0; - KsckTableChecksumMap checksum_tables; - for (const shared_ptr<KsckTable>& table : cluster_->tables()) { - KsckTableChecksum table_checksum; - for (const shared_ptr<KsckTablet>& tablet : table->tablets()) { - if (ContainsKey(checksums, tablet->id())) { - KsckTabletChecksum tablet_checksum; - tablet_checksum.tablet_id = tablet->id(); - bool seen_first_replica = false; - uint64_t first_checksum = 0; - - for (const auto& r : FindOrDie(checksums, tablet->id())) { - KsckReplicaChecksum replica_checksum; - const string& replica_uuid = r.first; - shared_ptr<KsckTabletServer> ts = FindOrDie(cluster_->tablet_servers(), replica_uuid); - replica_checksum.ts_uuid = ts->uuid(); - replica_checksum.ts_address = ts->address(); - - const ChecksumResultReporter::ResultPair& result = r.second; - const Status& status = result.first; - replica_checksum.checksum = result.second; - replica_checksum.status = status; - if (!status.ok()) { - num_errors++; - } else if (!seen_first_replica) { - seen_first_replica = true; - first_checksum = replica_checksum.checksum; - } else if (replica_checksum.checksum != first_checksum) { - num_mismatches++; - tablet_checksum.mismatch = true; - } - num_results++; - InsertOrDie(&tablet_checksum.replica_checksums, - replica_checksum.ts_uuid, - std::move(replica_checksum)); - } - InsertOrDie(&table_checksum, - tablet_checksum.tablet_id, - std::move(tablet_checksum)); - } - } - InsertOrDie(&checksum_tables, table->name(), std::move(table_checksum)); - } - results_.checksum_results.tables.swap(checksum_tables); - if (timed_out) { - return Status::TimedOut(Substitute("Checksum scan did not complete within the timeout of $0: " - "Received results for $1 out of $2 expected replicas", - options.timeout.ToString(), num_results, - num_tablet_replicas)); - } - CHECK_EQ(num_results, num_tablet_replicas) - << Substitute("Unexpected error: only got $0 out of $1 replica results", - num_results, num_tablet_replicas); - - if (num_mismatches != 0) { - return Status::Corruption(Substitute("$0 checksum mismatches were detected.", num_mismatches)); - } - if (num_errors != 0) { - return Status::Aborted(Substitute("$0 errors were detected", num_errors)); - } - - return Status::OK(); + return checksummer.ChecksumData(opts, + checksum_results, + out_for_progress_updates); } bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) { http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h index 8e5e0b2..c654be0 100644 --- a/src/kudu/tools/ksck.h +++ b/src/kudu/tools/ksck.h @@ -412,15 +412,15 @@ class KsckCluster { // The table's tablet list is modified only if this method returns OK. virtual Status RetrieveTabletsList(const std::shared_ptr<KsckTable>& table) = 0; - const MasterList& masters() { + const MasterList& masters() const { return masters_; } - const TSMap& tablet_servers() { + const TSMap& tablet_servers() const { return tablet_servers_; } - const std::vector<std::shared_ptr<KsckTable>>& tables() { + const std::vector<std::shared_ptr<KsckTable>>& tables() const { return tables_; } http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck_checksum.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck_checksum.cc b/src/kudu/tools/ksck_checksum.cc index 9d2b289..70d7c47 100644 --- a/src/kudu/tools/ksck_checksum.cc +++ b/src/kudu/tools/ksck_checksum.cc @@ -20,22 +20,30 @@ #include <algorithm> #include <cstdint> #include <iostream> +#include <map> #include <string> #include <unordered_map> #include <utility> +#include <vector> +#include <boost/optional/optional.hpp> #include <gflags/gflags.h> +#include <glog/logging.h> #include "kudu/common/schema.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/strings/human_readable.h" +#include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/tools/ksck.h" +#include "kudu/tools/tool_action_common.h" using std::endl; +using std::ostream; using std::shared_ptr; using std::string; using std::unordered_map; +using std::vector; using strings::Substitute; DEFINE_int32(checksum_timeout_sec, 3600, @@ -53,17 +61,40 @@ namespace kudu { namespace tools { KsckChecksumOptions::KsckChecksumOptions() - : timeout(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec)), - scan_concurrency(FLAGS_checksum_scan_concurrency), - use_snapshot(FLAGS_checksum_snapshot), - snapshot_timestamp(FLAGS_checksum_snapshot_timestamp) {} + : KsckChecksumOptions({}, {}) {} -KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout, int scan_concurrency, - bool use_snapshot, uint64_t snapshot_timestamp) +KsckChecksumOptions::KsckChecksumOptions(vector<string> table_filters, + vector<string> tablet_id_filters) + : KsckChecksumOptions(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec), + FLAGS_checksum_scan_concurrency, + FLAGS_checksum_snapshot, + FLAGS_checksum_snapshot_timestamp, + std::move(table_filters), + std::move(tablet_id_filters)) {} + +KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout, + int scan_concurrency, + bool use_snapshot, + uint64_t snapshot_timestamp) + : KsckChecksumOptions(timeout, + scan_concurrency, + use_snapshot, + snapshot_timestamp, + {}, + {}) {} + +KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout, + int scan_concurrency, + bool use_snapshot, + uint64_t snapshot_timestamp, + vector<string> table_filters, + vector<string> tablet_id_filters) : timeout(timeout), scan_concurrency(scan_concurrency), use_snapshot(use_snapshot), - snapshot_timestamp(snapshot_timestamp) {} + snapshot_timestamp(snapshot_timestamp), + table_filters(std::move(table_filters)), + tablet_id_filters(std::move(tablet_id_filters)) {} ChecksumResultReporter::ChecksumResultReporter(int num_tablet_replicas) : expected_count_(num_tablet_replicas), @@ -153,5 +184,223 @@ void TabletServerChecksumCallbacks::Finished(const Status& status, uint64_t chec } } +KsckChecksummer::KsckChecksummer(KsckCluster* cluster) + : cluster_(CHECK_NOTNULL(cluster)) {} + +Status KsckChecksummer::BuildTabletTableMap( + const KsckChecksumOptions& opts, + KsckChecksummer::TabletTableMap* tablet_table_map, + int* num_replicas) const { + CHECK(tablet_table_map); + CHECK(num_replicas); + + TabletTableMap tablet_table_map_tmp; + int num_tables = 0; + int num_tablets = 0; + int num_replicas_tmp = 0; + for (const shared_ptr<KsckTable>& table : cluster_->tables()) { + VLOG(1) << "Table: " << table->name(); + if (!MatchesAnyPattern(opts.table_filters, table->name())) continue; + num_tables += 1; + num_tablets += table->tablets().size(); + for (const shared_ptr<KsckTablet>& tablet : table->tablets()) { + VLOG(1) << "Tablet: " << tablet->id(); + if (!MatchesAnyPattern(opts.tablet_id_filters, tablet->id())) continue; + InsertOrDie(&tablet_table_map_tmp, tablet, table); + num_replicas_tmp += tablet->replicas().size(); + } + } + + if (num_tables == 0) { + string msg = "No table found."; + if (!opts.table_filters.empty()) { + msg += " Filter: table_filters=" + JoinStrings(opts.table_filters, ","); + } + return Status::NotFound(msg); + } + + if (num_tablets > 0 && num_replicas_tmp == 0) { + // Warn if the table has tablets, but no replicas. The table may have no + // tablets if all range partitions have been dropped. + string msg = "No tablet replicas found."; + if (!opts.table_filters.empty() || !opts.tablet_id_filters.empty()) { + msg += " Filter:"; + if (!opts.table_filters.empty()) { + msg += " table_filters=" + JoinStrings(opts.table_filters, ","); + } + if (!opts.tablet_id_filters.empty()) { + msg += " tablet_id_filters=" + JoinStrings(opts.tablet_id_filters, ","); + } + } + return Status::NotFound(msg); + } + + *tablet_table_map = std::move(tablet_table_map_tmp); + *num_replicas = num_replicas_tmp; + return Status::OK(); +} + +Status KsckChecksummer::CollateChecksumResults( + const ChecksumResultReporter::TabletResultMap& checksums, + KsckTableChecksumMap* table_checksum_map, + int* num_results) const { + CHECK(table_checksum_map); + CHECK(num_results); + + table_checksum_map->clear(); + *num_results = 0; + int num_errors = 0; + int num_mismatches = 0; + for (const auto& table : cluster_->tables()) { + KsckTableChecksum table_checksum; + for (const auto& tablet : table->tablets()) { + if (ContainsKey(checksums, tablet->id())) { + KsckTabletChecksum tablet_checksum; + tablet_checksum.tablet_id = tablet->id(); + bool seen_first_replica = false; + uint64_t first_checksum = 0; + + for (const auto& r : FindOrDie(checksums, tablet->id())) { + KsckReplicaChecksum replica_checksum; + const auto& replica_uuid = r.first; + const auto& ts = FindOrDie(cluster_->tablet_servers(), replica_uuid); + replica_checksum.ts_uuid = ts->uuid(); + replica_checksum.ts_address = ts->address(); + + const ChecksumResultReporter::ResultPair& result = r.second; + const Status& status = result.first; + replica_checksum.checksum = result.second; + replica_checksum.status = status; + if (!status.ok()) { + num_errors++; + } else if (!seen_first_replica) { + seen_first_replica = true; + first_checksum = replica_checksum.checksum; + } else if (replica_checksum.checksum != first_checksum) { + num_mismatches++; + tablet_checksum.mismatch = true; + } + (*num_results)++; + EmplaceOrDie(&tablet_checksum.replica_checksums, + replica_checksum.ts_uuid, + std::move(replica_checksum)); + } + EmplaceOrDie(&table_checksum, + tablet_checksum.tablet_id, + std::move(tablet_checksum)); + } + } + EmplaceOrDie(table_checksum_map, table->name(), std::move(table_checksum)); + } + + if (num_mismatches != 0) { + return Status::Corruption(Substitute("$0 checksum mismatches were detected.", + num_mismatches)); + } + if (num_errors != 0) { + return Status::Aborted(Substitute("$0 errors were detected", num_errors)); + } + return Status::OK(); +} + +Status KsckChecksummer::ChecksumData(const KsckChecksumOptions& opts, + KsckChecksumResults* checksum_results, + ostream* out_for_progress_updates) { + CHECK(checksum_results); + + // Make a copy of 'opts' because we may need to assign a snapshot timestamp + // if one was not provided. + KsckChecksumOptions options = opts; + + // Clear the contents of 'checksum_results' because we always overwrite it + // with whatever results are obtained (and with nothing if there's no results). + checksum_results->snapshot_timestamp = boost::none; + checksum_results->tables.clear(); + + TabletTableMap tablet_table_map; + int num_replicas; + RETURN_NOT_OK(BuildTabletTableMap(options, &tablet_table_map, &num_replicas)); + + // Map of tablet servers to tablet queue. + typedef unordered_map<shared_ptr<KsckTabletServer>, SharedTabletQueue> TabletServerQueueMap; + + TabletServerQueueMap tablet_server_queues; + scoped_refptr<ChecksumResultReporter> reporter( + new ChecksumResultReporter(num_replicas)); + + // Create a queue of checksum callbacks grouped by the tablet server. + for (const auto& entry : tablet_table_map) { + const shared_ptr<KsckTablet>& tablet = entry.first; + const shared_ptr<KsckTable>& table = entry.second; + for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas()) { + const shared_ptr<KsckTabletServer>& ts = + FindOrDie(cluster_->tablet_servers(), replica->ts_uuid()); + + const SharedTabletQueue& queue = + LookupOrInsertNewSharedPtr(&tablet_server_queues, ts, num_replicas); + CHECK_EQ(QUEUE_SUCCESS, queue->Put(make_pair(table->schema(), tablet->id()))); + } + } + + // Set the snapshot timestamp. If the sentinel value 'kCurrentTimestamp' was + // provided, the snapshot timestamp is set to the current timestamp of some + // healthy tablet server. + if (options.use_snapshot && + options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) { + for (const auto& ts : tablet_server_queues) { + if (ts.first->is_healthy()) { + options.snapshot_timestamp = ts.first->current_timestamp(); + break; + } + } + if (options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) { + return Status::ServiceUnavailable( + "No tablet servers were available to fetch the current timestamp"); + } + checksum_results->snapshot_timestamp = options.snapshot_timestamp; + } + + // Kick off checksum scans in parallel. For each tablet server, we start + // 'options.scan_concurrency' scans. Each callback then initiates one + // additional scan when it returns if the queue for that TS is not empty. + for (const auto& entry : tablet_server_queues) { + const shared_ptr<KsckTabletServer>& tablet_server = entry.first; + const SharedTabletQueue& queue = entry.second; + queue->Shutdown(); // Ensures that BlockingGet() will not block. + for (int i = 0; i < options.scan_concurrency; i++) { + std::pair<Schema, std::string> table_tablet; + if (queue->BlockingGet(&table_tablet)) { + const Schema& table_schema = table_tablet.first; + const std::string& tablet_id = table_tablet.second; + auto* cbs = new TabletServerChecksumCallbacks( + reporter, tablet_server, queue, tablet_id, options); + // 'cbs' deletes itself when complete. + tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, cbs); + } + } + } + + bool timed_out = !reporter->WaitFor(options.timeout, out_for_progress_updates); + + // Even if we timed out, collate the checksum results that we did get. + KsckTableChecksumMap checksum_table_map; + int num_results; + const Status s = CollateChecksumResults(reporter->checksums(), + &checksum_table_map, + &num_results); + checksum_results->tables = std::move(checksum_table_map); + + if (timed_out) { + return Status::TimedOut(Substitute("Checksum scan did not complete within the timeout of $0: " + "Received results for $1 out of $2 expected replicas", + options.timeout.ToString(), num_results, + num_replicas)); + } + CHECK_EQ(num_results, num_replicas) + << Substitute("Unexpected error: only got $0 out of $1 replica results", + num_results, num_replicas); + return s; +} + } // namespace tools } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck_checksum.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck_checksum.h b/src/kudu/tools/ksck_checksum.h index 7285f92..8fbb2f1 100644 --- a/src/kudu/tools/ksck_checksum.h +++ b/src/kudu/tools/ksck_checksum.h @@ -24,8 +24,11 @@ #include <string> #include <unordered_map> #include <utility> +#include <vector> +#include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" +#include "kudu/tools/ksck_results.h" #include "kudu/util/atomic.h" #include "kudu/util/blocking_queue.h" #include "kudu/util/countdown_latch.h" @@ -39,6 +42,9 @@ class Schema; namespace tools { +class KsckCluster; +class KsckTable; +class KsckTablet; class KsckTabletServer; // Options for checksum scans. @@ -49,11 +55,21 @@ struct KsckChecksumOptions { KsckChecksumOptions(); + KsckChecksumOptions(std::vector<std::string> table_filters, + std::vector<std::string> tablet_id_filters); + KsckChecksumOptions(MonoDelta timeout, int scan_concurrency, bool use_snapshot, uint64_t snapshot_timestamp); + KsckChecksumOptions(MonoDelta timeout, + int scan_concurrency, + bool use_snapshot, + uint64_t snapshot_timestamp, + std::vector<std::string> table_filters, + std::vector<std::string> tablet_id_filters); + // The maximum total time to wait for results to come back from all replicas. MonoDelta timeout; @@ -65,6 +81,11 @@ struct KsckChecksumOptions { // The snapshot timestamp to use for snapshot checksum scans. uint64_t snapshot_timestamp; + + // Filters for the table names and tablet ids whose contents should be + // checksummed. + std::vector<std::string> table_filters; + std::vector<std::string> tablet_id_filters; }; // Interface for reporting progress on checksumming a single @@ -152,6 +173,8 @@ class TabletServerChecksumCallbacks : public KsckChecksumProgressCallbacks { void Finished(const Status& status, uint64_t checksum) override; private: + ~TabletServerChecksumCallbacks() = default; + const scoped_refptr<ChecksumResultReporter> reporter_; const std::shared_ptr<KsckTabletServer> tablet_server_; const SharedTabletQueue queue_; @@ -159,5 +182,47 @@ class TabletServerChecksumCallbacks : public KsckChecksumProgressCallbacks { std::string tablet_id_; }; + +// A class for performing checksum scans against a Kudu cluster. +class KsckChecksummer { + public: + // 'cluster' must remain valid as long as this instance is alive. + explicit KsckChecksummer(KsckCluster* cluster); + + // Checksum the data in the Kudu cluster according to the options provided in + // 'opts'. Results will be populated in the 'checksum_results'. If non-null, + // progress updates will be written to 'out_for_progress_updates'. + // NOTE: Even if this method returns a bad Status, 'checksum_results' will be + // populated with whatever checksum results were received. + Status ChecksumData(const KsckChecksumOptions& opts, + KsckChecksumResults* checksum_results, + std::ostream* out_for_progress_updates); + + private: + typedef std::unordered_map<std::shared_ptr<KsckTablet>, + std::shared_ptr<KsckTable>> TabletTableMap; + + // Builds a mapping from tablet-to-be-checksummed to its table, which is + // used to create checksum callbacks. This mapping is returned in + // 'tablet_table_map' and the total number of replicas to be checksummed is + // returned in 'num_replicas'. + Status BuildTabletTableMap(const KsckChecksumOptions& opts, + TabletTableMap* tablet_table_map, + int* num_replicas) const; + + // Collates the results of checksums into 'table_checksum_map', with the + // total number of results returned as 'num_results'. + // NOTE: Even if this function returns a bad Status, 'table_checksum_map' + // and 'num_results' will still be populated using whatever results are + // available. + Status CollateChecksumResults( + const ChecksumResultReporter::TabletResultMap& checksums, + KsckTableChecksumMap* table_checksum_map, + int* num_results) const; + + KsckCluster* cluster_; + + DISALLOW_COPY_AND_ASSIGN(KsckChecksummer); +}; } // namespace tools } // namespace kudu
